This is an automated email from the ASF dual-hosted git repository.

liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d1846591f7 [fix](insert overwrite) delay overwrite partition routing 
until incremental open (#63209)
1d1846591f7 is described below

commit 1d1846591f7b4367f413fb890b82429cf80a2680
Author: hui lai <[email protected]>
AuthorDate: Thu May 14 23:38:09 2026 +0800

    [fix](insert overwrite) delay overwrite partition routing until incremental 
open (#63209)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    In auto-detect insert overwrite, BE sender could publish newly replaced
    temporary partitions to local row routing before incremental open
    finished on target BEs.
    
    The race was:
    
    1. One sender calls FE `replacePartition` and receives new temporary
    partition/tablet metadata.
    2. The sender records the new partition id and replaces local
    `_vpartition` routing first.
    3. Another concurrent batch can then route rows to the new tablet.
    4. The first sender has not finished incremental open yet, so the target
    BE may not have created the delta writer for that tablet.
    5. The target BE returns `unknown tablet to append data`.
    
    This PR makes the sender finish `_create_partition_callback`, including
    incremental open/open_wait, before publishing the new partition/tablet
    to local routing and marking the new partition as handled.
---
 be/src/exec/sink/vrow_distribution.cpp       | 19 ++++++------
 be/test/exec/sink/vrow_distribution_test.cpp | 45 ++++++++++++++++++++++++----
 2 files changed, 50 insertions(+), 14 deletions(-)

diff --git a/be/src/exec/sink/vrow_distribution.cpp 
b/be/src/exec/sink/vrow_distribution.cpp
index 40a20c08d5a..891c4b394cb 100644
--- a/be/src/exec/sink/vrow_distribution.cpp
+++ b/be/src/exec/sink/vrow_distribution.cpp
@@ -145,13 +145,13 @@ Status VRowDistribution::automatic_create_partition() {
     Status status(Status::create(result.status));
     VLOG_NOTICE << "automatic partition rpc end response " << result;
     if (result.status.status_code == TStatusCode::OK) {
+        RETURN_IF_ERROR(_create_partition_callback(_caller, &result));
         // add new created partitions
         RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions));
         for (const auto& part : result.partitions) {
             _new_partition_ids.insert(part.id);
             VLOG_TRACE << "record new id: " << part.id;
         }
-        RETURN_IF_ERROR(_create_partition_callback(_caller, &result));
     }
 
     // Record this request's elapsed time
@@ -163,13 +163,13 @@ Status VRowDistribution::automatic_create_partition() {
 }
 
 // for reuse the same create callback of create-partition
-static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult& 
arg) {
+static TCreatePartitionResult cast_as_create_result(const 
TReplacePartitionResult& arg) {
     TCreatePartitionResult result;
     result.status = arg.status;
-    result.nodes = std::move(arg.nodes);
-    result.partitions = std::move(arg.partitions);
-    result.tablets = std::move(arg.tablets);
-    result.slave_tablets = std::move(arg.slave_tablets);
+    result.nodes = arg.nodes;
+    result.partitions = arg.partitions;
+    result.tablets = arg.tablets;
+    result.slave_tablets = arg.slave_tablets;
     return result;
 }
 
@@ -246,6 +246,10 @@ Status VRowDistribution::_replace_overwriting_partition() {
     Status status(Status::create(result.status));
     VLOG_NOTICE << "auto detect replace partition result: " << result;
     if (result.status.status_code == TStatusCode::OK) {
+        // Reuse the function as the args' structure are same. It adds 
nodes/locations
+        // and waits for incremental_open before the new tablets become 
routable.
+        auto result_as_create = cast_as_create_result(result);
+        RETURN_IF_ERROR(_create_partition_callback(_caller, 
&result_as_create));
         // record new partitions
         for (const auto& part : result.partitions) {
             _new_partition_ids.insert(part.id);
@@ -253,9 +257,6 @@ Status VRowDistribution::_replace_overwriting_partition() {
         }
         // replace data in _partitions
         RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids, 
result.partitions));
-        // reuse the function as the args' structure are same. it add 
nodes/locations and incremental_open
-        auto result_as_create = cast_as_create_result(result);
-        RETURN_IF_ERROR(_create_partition_callback(_caller, 
&result_as_create));
     }
 
     return status;
diff --git a/be/test/exec/sink/vrow_distribution_test.cpp 
b/be/test/exec/sink/vrow_distribution_test.cpp
index 9c8e8798779..a5d862f370b 100644
--- a/be/test/exec/sink/vrow_distribution_test.cpp
+++ b/be/test/exec/sink/vrow_distribution_test.cpp
@@ -58,10 +58,16 @@ Status _noop_create_partition_callback(void*, 
TCreatePartitionResult*) {
     return Status::OK();
 }
 
+Status _delegated_create_partition_callback(void* caller, 
TCreatePartitionResult* result) {
+    return 
(*static_cast<std::function<Status(TCreatePartitionResult*)>*>(caller))(result);
+}
+
 std::unique_ptr<VRowDistributionHarness> _build_vrow_distribution_harness(
         OperatorContext& ctx, const TOlapTableSchemaParam& tschema,
         const TOlapTablePartitionParam& tpartition, const 
TOlapTableLocationParam& tlocation,
-        TTupleId tablet_sink_tuple_id, int64_t txn_id) {
+        TTupleId tablet_sink_tuple_id, int64_t txn_id,
+        CreatePartitionCallback create_partition_callback = 
&_noop_create_partition_callback,
+        void* caller = nullptr) {
     auto h = std::make_unique<VRowDistributionHarness>();
 
     h->schema = std::make_shared<OlapTableSchemaParam>();
@@ -91,9 +97,9 @@ std::unique_ptr<VRowDistributionHarness> 
_build_vrow_distribution_harness(
     rctx.location = h->location.get();
     rctx.vec_output_expr_ctxs = &h->output_expr_ctxs;
     rctx.schema = h->schema;
-    rctx.caller = nullptr;
+    rctx.caller = caller;
     rctx.write_single_replica = false;
-    rctx.create_partition_callback = &_noop_create_partition_callback;
+    rctx.create_partition_callback = create_partition_callback;
     h->row_distribution.init(rctx);
 
     st = h->row_distribution.open(h->output_row_desc.get());
@@ -357,8 +363,36 @@ TEST(VRowDistributionTest, 
ReplaceOverwritingPartitionInjectedRequestDedupAndRep
     tpartition.__set_overwrite_group_id(123);
     auto tlocation = sink_test_utils::build_location_param();
 
-    auto h = _build_vrow_distribution_harness(ctx, tschema, tpartition, 
tlocation,
-                                              tablet_sink_tuple_id, txn_id);
+    VRowDistributionHarness* harness = nullptr;
+    bool create_callback_called = false;
+    std::function<Status(TCreatePartitionResult*)> create_callback =
+            [&](TCreatePartitionResult* result) {
+                create_callback_called = true;
+                EXPECT_EQ(result->partitions.size(), 2);
+
+                auto old_partition_block = 
ColumnHelper::create_block<DataTypeInt32>({1});
+                VOlapTablePartition* old_part = nullptr;
+                harness->vpartition->find_partition(&old_partition_block, 0, 
old_part);
+                if (old_part == nullptr) {
+                    return Status::InternalError("old partition is not found");
+                }
+                EXPECT_EQ(old_part->id, 1);
+
+                auto another_old_partition_block = 
ColumnHelper::create_block<DataTypeInt32>({25});
+                VOlapTablePartition* another_old_part = nullptr;
+                
harness->vpartition->find_partition(&another_old_partition_block, 0,
+                                                    another_old_part);
+                if (another_old_part == nullptr) {
+                    return Status::InternalError("another old partition is not 
found");
+                }
+                EXPECT_EQ(another_old_part->id, 2);
+                return Status::OK();
+            };
+
+    auto h = _build_vrow_distribution_harness(
+            ctx, tschema, tpartition, tlocation, tablet_sink_tuple_id, txn_id,
+            &_delegated_create_partition_callback, &create_callback);
+    harness = h.get();
 
     doris::config::enable_debug_points = true;
     doris::DebugPoints::instance()->clear();
@@ -423,6 +457,7 @@ TEST(VRowDistributionTest, 
ReplaceOverwritingPartitionInjectedRequestDedupAndRep
                                                             
row_part_tablet_ids, rows_stat_val);
         EXPECT_TRUE(st.ok()) << st.to_string();
         EXPECT_EQ(injected_times, 1);
+        EXPECT_TRUE(create_callback_called);
 
         ASSERT_EQ(row_part_tablet_ids.size(), 1);
         ASSERT_EQ(row_part_tablet_ids[0].partition_ids.size(), 2);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to