This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 0b20a78ea8c branch-4.1: [fix](insert overwrite) delay overwrite 
partition routing until incremental open #63209 (#63264)
0b20a78ea8c is described below

commit 0b20a78ea8c492b13716992a39e3a44c1c522c93
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu May 21 15:42:01 2026 +0800

    branch-4.1: [fix](insert overwrite) delay overwrite partition routing until 
incremental open #63209 (#63264)
    
    Cherry-picked from #63209
    
    Co-authored-by: hui lai <[email protected]>
---
 be/src/exec/sink/vrow_distribution.cpp       | 19 ++++++------
 be/test/exec/sink/vrow_distribution_test.cpp | 45 ++++++++++++++++++++++++----
 2 files changed, 50 insertions(+), 14 deletions(-)

diff --git a/be/src/exec/sink/vrow_distribution.cpp 
b/be/src/exec/sink/vrow_distribution.cpp
index f14f8052969..130f21ad364 100644
--- a/be/src/exec/sink/vrow_distribution.cpp
+++ b/be/src/exec/sink/vrow_distribution.cpp
@@ -141,13 +141,13 @@ Status VRowDistribution::automatic_create_partition() {
     Status status(Status::create(result.status));
     VLOG_NOTICE << "automatic partition rpc end response " << result;
     if (result.status.status_code == TStatusCode::OK) {
+        RETURN_IF_ERROR(_create_partition_callback(_caller, &result));
         // add new created partitions
         RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions));
         for (const auto& part : result.partitions) {
             _new_partition_ids.insert(part.id);
             VLOG_TRACE << "record new id: " << part.id;
         }
-        RETURN_IF_ERROR(_create_partition_callback(_caller, &result));
     }
 
     // Record this request's elapsed time
@@ -159,13 +159,13 @@ Status VRowDistribution::automatic_create_partition() {
 }
 
 // for reuse the same create callback of create-partition
-static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult& 
arg) {
+static TCreatePartitionResult cast_as_create_result(const 
TReplacePartitionResult& arg) {
     TCreatePartitionResult result;
     result.status = arg.status;
-    result.nodes = std::move(arg.nodes);
-    result.partitions = std::move(arg.partitions);
-    result.tablets = std::move(arg.tablets);
-    result.slave_tablets = std::move(arg.slave_tablets);
+    result.nodes = arg.nodes;
+    result.partitions = arg.partitions;
+    result.tablets = arg.tablets;
+    result.slave_tablets = arg.slave_tablets;
     return result;
 }
 
@@ -237,6 +237,10 @@ Status VRowDistribution::_replace_overwriting_partition() {
     Status status(Status::create(result.status));
     VLOG_NOTICE << "auto detect replace partition result: " << result;
     if (result.status.status_code == TStatusCode::OK) {
+        // Reuse the function as the args' structure are same. It adds 
nodes/locations
+        // and waits for incremental_open before the new tablets become 
routable.
+        auto result_as_create = cast_as_create_result(result);
+        RETURN_IF_ERROR(_create_partition_callback(_caller, 
&result_as_create));
         // record new partitions
         for (const auto& part : result.partitions) {
             _new_partition_ids.insert(part.id);
@@ -244,9 +248,6 @@ Status VRowDistribution::_replace_overwriting_partition() {
         }
         // replace data in _partitions
         RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids, 
result.partitions));
-        // reuse the function as the args' structure are same. it add 
nodes/locations and incremental_open
-        auto result_as_create = cast_as_create_result(result);
-        RETURN_IF_ERROR(_create_partition_callback(_caller, 
&result_as_create));
     }
 
     return status;
diff --git a/be/test/exec/sink/vrow_distribution_test.cpp 
b/be/test/exec/sink/vrow_distribution_test.cpp
index 36765eca44f..82c780a4e2c 100644
--- a/be/test/exec/sink/vrow_distribution_test.cpp
+++ b/be/test/exec/sink/vrow_distribution_test.cpp
@@ -58,10 +58,16 @@ Status _noop_create_partition_callback(void*, 
TCreatePartitionResult*) {
     return Status::OK();
 }
 
+Status _delegated_create_partition_callback(void* caller, 
TCreatePartitionResult* result) {
+    return 
(*static_cast<std::function<Status(TCreatePartitionResult*)>*>(caller))(result);
+}
+
 std::unique_ptr<VRowDistributionHarness> _build_vrow_distribution_harness(
         OperatorContext& ctx, const TOlapTableSchemaParam& tschema,
         const TOlapTablePartitionParam& tpartition, const 
TOlapTableLocationParam& tlocation,
-        TTupleId tablet_sink_tuple_id, int64_t txn_id) {
+        TTupleId tablet_sink_tuple_id, int64_t txn_id,
+        CreatePartitionCallback create_partition_callback = 
&_noop_create_partition_callback,
+        void* caller = nullptr) {
     auto h = std::make_unique<VRowDistributionHarness>();
 
     h->schema = std::make_shared<OlapTableSchemaParam>();
@@ -92,9 +98,9 @@ std::unique_ptr<VRowDistributionHarness> 
_build_vrow_distribution_harness(
     rctx.location = h->location.get();
     rctx.vec_output_expr_ctxs = &h->output_expr_ctxs;
     rctx.schema = h->schema;
-    rctx.caller = nullptr;
+    rctx.caller = caller;
     rctx.write_single_replica = false;
-    rctx.create_partition_callback = &_noop_create_partition_callback;
+    rctx.create_partition_callback = create_partition_callback;
     h->row_distribution.init(rctx);
 
     st = h->row_distribution.open(h->output_row_desc.get());
@@ -358,8 +364,36 @@ TEST(VRowDistributionTest, 
ReplaceOverwritingPartitionInjectedRequestDedupAndRep
     tpartition.__set_overwrite_group_id(123);
     auto tlocation = sink_test_utils::build_location_param();
 
-    auto h = _build_vrow_distribution_harness(ctx, tschema, tpartition, 
tlocation,
-                                              tablet_sink_tuple_id, txn_id);
+    VRowDistributionHarness* harness = nullptr;
+    bool create_callback_called = false;
+    std::function<Status(TCreatePartitionResult*)> create_callback =
+            [&](TCreatePartitionResult* result) {
+                create_callback_called = true;
+                EXPECT_EQ(result->partitions.size(), 2);
+
+                auto old_partition_block = 
ColumnHelper::create_block<DataTypeInt32>({1});
+                VOlapTablePartition* old_part = nullptr;
+                harness->vpartition->find_partition(&old_partition_block, 0, 
old_part);
+                if (old_part == nullptr) {
+                    return Status::InternalError("old partition is not found");
+                }
+                EXPECT_EQ(old_part->id, 1);
+
+                auto another_old_partition_block = 
ColumnHelper::create_block<DataTypeInt32>({25});
+                VOlapTablePartition* another_old_part = nullptr;
+                
harness->vpartition->find_partition(&another_old_partition_block, 0,
+                                                    another_old_part);
+                if (another_old_part == nullptr) {
+                    return Status::InternalError("another old partition is not 
found");
+                }
+                EXPECT_EQ(another_old_part->id, 2);
+                return Status::OK();
+            };
+
+    auto h = _build_vrow_distribution_harness(
+            ctx, tschema, tpartition, tlocation, tablet_sink_tuple_id, txn_id,
+            &_delegated_create_partition_callback, &create_callback);
+    harness = h.get();
 
     doris::config::enable_debug_points = true;
     doris::DebugPoints::instance()->clear();
@@ -424,6 +458,7 @@ TEST(VRowDistributionTest, 
ReplaceOverwritingPartitionInjectedRequestDedupAndRep
                                                             
row_part_tablet_ids, rows_stat_val);
         EXPECT_TRUE(st.ok()) << st.to_string();
         EXPECT_EQ(injected_times, 1);
+        EXPECT_TRUE(create_callback_called);
 
         ASSERT_EQ(row_part_tablet_ids.size(), 1);
         ASSERT_EQ(row_part_tablet_ids[0].partition_ids.size(), 2);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to