This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 58f171035fa branch-4.0: [feat](storage) introduce backpressure 
algorithm to control version number (part I) #57133 (#57416)
58f171035fa is described below

commit 58f171035fad18913cae0ca4fc35b814778e0bda
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Nov 24 12:45:22 2025 +0800

    branch-4.0: [feat](storage) introduce backpressure algorithm to control 
version number (part I) #57133 (#57416)
    
    Cherry-picked from #57133
    
    Co-authored-by: hui lai <[email protected]>
---
 be/src/common/config.cpp                           |  5 ++
 be/src/common/config.h                             |  5 ++
 be/src/olap/delta_writer.cpp                       | 16 ++++++
 be/src/olap/delta_writer.h                         |  3 ++
 be/src/olap/rowset_builder.h                       |  5 +-
 be/src/runtime/tablets_channel.cpp                 |  6 +++
 be/src/runtime/tablets_channel.h                   |  1 +
 be/src/vec/sink/writer/vtablet_writer.cpp          | 57 ++++++++++++++++++++++
 be/src/vec/sink/writer/vtablet_writer.h            | 10 ++++
 .../test_load_back_pressure_version.groovy         | 46 +++++++++++++++++
 10 files changed, 152 insertions(+), 2 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5e7871814fb..6b2175f1a2e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -852,6 +852,11 @@ DEFINE_mInt32(max_tablet_version_num, "2000");
 
 DEFINE_mInt32(time_series_max_tablet_version_num, "20000");
 
+// the max sleep time when meeting high pressure load task
+DEFINE_mInt64(max_load_back_pressure_version_wait_time_ms, "3000");
+// the threshold of rowset number gap that triggers back pressure
+DEFINE_mInt64(load_back_pressure_version_threshold, "80"); // 80%
+
 // Frontend mainly use two thrift sever type: THREAD_POOL, THREADED_SELECTOR. 
if fe use THREADED_SELECTOR model for thrift server,
 // the thrift_server_type_of_fe should be set THREADED_SELECTOR to make be 
thrift client to fe constructed with TFramedTransport
 DEFINE_String(thrift_server_type_of_fe, "THREAD_POOL");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4d3b1a73420..72602834d54 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -877,6 +877,11 @@ DECLARE_mInt32(max_tablet_version_num);
 
 DECLARE_mInt32(time_series_max_tablet_version_num);
 
+// the max sleep time when meeting high pressure load task
+DECLARE_mInt64(max_load_back_pressure_version_wait_time_ms);
+// the threshold of rowset number gap that triggers back pressure
+DECLARE_mInt64(load_back_pressure_version_threshold);
+
 // Frontend mainly use two thrift sever type: THREAD_POOL, THREADED_SELECTOR. 
if fe use THREADED_SELECTOR model for thrift server,
 // the thrift_server_type_of_fe should be set THREADED_SELECTOR to make be 
thrift client to fe constructed with TFramedTransport
 DECLARE_String(thrift_server_type_of_fe);
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index b96776c19b3..2098711a9e0 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -98,6 +98,22 @@ BaseDeltaWriter::~BaseDeltaWriter() {
     }
 }
 
+void BaseDeltaWriter::set_tablet_load_rowset_num_info(
+        google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* 
tablet_infos) {
+    auto* tablet = _rowset_builder->tablet().get();
+    if (tablet == nullptr) {
+        return;
+    }
+    auto max_version_config = tablet->max_version_config();
+    if (auto version_cnt = tablet->tablet_meta()->version_count();
+        UNLIKELY(version_cnt >
+                 (max_version_config * 
config::load_back_pressure_version_threshold / 100))) {
+        auto* load_info = tablet_infos->Add();
+        load_info->set_current_rowset_nums(static_cast<int32_t>(version_cnt));
+        load_info->set_max_config_rowset_nums(max_version_config);
+    }
+}
+
 DeltaWriter::~DeltaWriter() = default;
 
 Status BaseDeltaWriter::init() {
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index f840e4f8aa1..1795d834556 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -93,6 +93,9 @@ public:
 
     int64_t num_rows_filtered() const;
 
+    void set_tablet_load_rowset_num_info(
+            google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* 
tablet_info);
+
 protected:
     virtual void _init_profile(RuntimeProfile* profile);
 
diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h
index 8da8fe8ad60..99797354347 100644
--- a/be/src/olap/rowset_builder.h
+++ b/be/src/olap/rowset_builder.h
@@ -126,6 +126,9 @@ public:
 
     Status commit_txn();
 
+    // Cast `BaseTablet` to `Tablet`
+    Tablet* tablet();
+
 private:
     void _init_profile(RuntimeProfile* profile) override;
 
@@ -135,8 +138,6 @@ private:
 
     void _garbage_collection();
 
-    // Cast `BaseTablet` to `Tablet`
-    Tablet* tablet();
     TabletSharedPtr tablet_sptr();
 
     StorageEngine& _engine;
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 460879978cd..1bb7de0c19f 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -614,10 +614,16 @@ Status BaseTabletsChannel::_write_block_data(
     };
 
     SCOPED_TIMER(_write_block_timer);
+    auto* tablet_load_infos = response->mutable_tablet_load_rowset_num_infos();
     for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
         RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first, 
[&](BaseDeltaWriter* writer) {
             return writer->write(&send_data, tablet_to_rowidxs_it.second);
         }));
+
+        auto tablet_writer_it = 
_tablet_writers.find(tablet_to_rowidxs_it.first);
+        if (tablet_writer_it != _tablet_writers.end()) {
+            
tablet_writer_it->second->set_tablet_load_rowset_num_info(tablet_load_infos);
+        }
     }
 
     {
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index a8e2cc96f01..46231bc75e6 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -50,6 +50,7 @@ class PSuccessSlaveTabletNodeIds;
 class PTabletError;
 class PTabletInfo;
 class PTabletWriterOpenRequest;
+class PTabletWriterOpenResult;
 class PTabletWriterAddBlockRequest;
 class PTabletWriterAddBlockResult;
 class PUniqueId;
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index fe51df4dcb2..fb864435cdf 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -45,6 +45,7 @@
 #include <vector>
 
 #include "cloud/config.h"
+#include "common/config.h"
 #include "cpp/sync_point.h"
 #include "util/runtime_profile.h"
 #include "vec/data_types/data_type.h"
@@ -100,6 +101,8 @@ bvar::PerSecond<bvar::Adder<int64_t>> 
g_sink_write_bytes_per_second("sink_throug
 bvar::Adder<int64_t> g_sink_write_rows;
 bvar::PerSecond<bvar::Adder<int64_t>> 
g_sink_write_rows_per_second("sink_throughput_row",
                                                                    
&g_sink_write_rows, 60);
+bvar::Adder<int64_t> g_sink_load_back_pressure_version_time_ms(
+        "load_back_pressure_version_time_ms");
 
 Status IndexChannel::init(RuntimeState* state, const 
std::vector<TTabletWithPartition>& tablets,
                           bool incremental) {
@@ -806,6 +809,15 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState* 
state,
         return 0;
     }
 
+    auto load_back_pressure_version_wait_time_ms = 
_load_back_pressure_version_wait_time_ms.load();
+    if (UNLIKELY(load_back_pressure_version_wait_time_ms > 0)) {
+        std::this_thread::sleep_for(
+                
std::chrono::milliseconds(load_back_pressure_version_wait_time_ms));
+        _load_back_pressure_version_block_ms.fetch_add(
+                load_back_pressure_version_wait_time_ms); // already in 
milliseconds
+        _load_back_pressure_version_wait_time_ms = 0;
+    }
+
     // set closure for sending block.
     if (!_send_block_callback->try_set_in_flight()) {
         // There is packet in flight, skip.
@@ -839,6 +851,45 @@ void VNodeChannel::_cancel_with_msg(const std::string& 
msg) {
     _cancelled = true;
 }
 
+void VNodeChannel::_refresh_back_pressure_version_wait_time(
+        const 
::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
+                tablet_load_infos) {
+    int64_t max_rowset_num_gap = 0;
+    // if any one tablet is under high load pressure, we would make the whole 
procedure
+    // sleep to prevent the corresponding BE return -235
+    std::for_each(
+            tablet_load_infos.begin(), tablet_load_infos.end(),
+            [&max_rowset_num_gap](auto& load_info) {
+                int64_t cur_rowset_num = load_info.current_rowset_nums();
+                int64_t high_load_point = load_info.max_config_rowset_nums() *
+                                          
(config::load_back_pressure_version_threshold / 100);
+                DCHECK(cur_rowset_num > high_load_point);
+                max_rowset_num_gap = std::max(max_rowset_num_gap, 
cur_rowset_num - high_load_point);
+            });
+    // to slow down the high load pressure
+    // we would use the rowset num gap to calculate one sleep time
+    // for example:
+    // if the max tablet version is 2000, there are 3 BE
+    // A: ====================  1800
+    // B: ===================   1700
+    // C: ==================    1600
+    //    ==================    1600
+    //                      ^
+    //                      the high load point
+    // then then max gap is 1800 - (max tablet version * 
config::load_back_pressure_version_threshold / 100) = 200,
+    // we would make the whole send procesure sleep
+    // 1200ms for compaction to be done toe reduce the high pressure
+    auto max_time = config::max_load_back_pressure_version_wait_time_ms;
+    if (UNLIKELY(max_rowset_num_gap > 0)) {
+        _load_back_pressure_version_wait_time_ms.store(
+                std::min(max_rowset_num_gap + 1000, max_time));
+        LOG(INFO) << "try to back pressure version, wait time(ms): "
+                  << _load_back_pressure_version_wait_time_ms
+                  << ", load id: " << print_id(_parent->_load_id)
+                  << ", max_rowset_num_gap: " << max_rowset_num_gap;
+    }
+}
+
 void VNodeChannel::try_send_pending_block(RuntimeState* state) {
     SCOPED_ATTACH_TASK(state);
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker);
@@ -1007,6 +1058,7 @@ void VNodeChannel::_add_block_success_callback(const 
PTabletWriterAddBlockResult
     SCOPED_ATTACH_TASK(_state);
     Status status(Status::create(result.status()));
     if (status.ok()) {
+        
_refresh_back_pressure_version_wait_time(result.tablet_load_rowset_num_infos());
         // if has error tablet, handle them first
         for (const auto& error : result.tablet_errors()) {
             _index_channel->mark_as_failed(this, "tablet error: " + 
error.msg(), error.tablet_id());
@@ -1535,6 +1587,7 @@ Status VTabletWriter::_init(RuntimeState* state, 
RuntimeProfile* profile) {
     _max_wait_exec_timer = ADD_TIMER(profile, "MaxWaitExecTime");
     _add_batch_number = ADD_COUNTER(profile, "NumberBatchAdded", TUnit::UNIT);
     _num_node_channels = ADD_COUNTER(profile, "NumberNodeChannels", 
TUnit::UNIT);
+    _load_back_pressure_version_time_ms = ADD_TIMER(profile, 
"LoadBackPressureVersionTimeMs");
 
 #ifdef DEBUG
     // check: tablet ids should be unique
@@ -1869,6 +1922,10 @@ Status VTabletWriter::close(Status exec_status) {
             COUNTER_SET(_max_wait_exec_timer, 
writer_stats.max_wait_exec_time_ns);
             COUNTER_SET(_add_batch_number, writer_stats.total_add_batch_num);
             COUNTER_SET(_num_node_channels, writer_stats.num_node_channels);
+            COUNTER_SET(_load_back_pressure_version_time_ms,
+                        writer_stats.load_back_pressure_version_time_ms);
+            g_sink_load_back_pressure_version_time_ms
+                    << writer_stats.load_back_pressure_version_time_ms;
 
             // _number_input_rows don't contain num_rows_load_filtered and 
num_rows_load_unselected in scan node
             int64_t num_rows_load_total = _number_input_rows + 
_state->num_rows_load_filtered() +
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index 3e0dd298fb0..ba1c0c505d6 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -218,6 +218,7 @@ struct WriterStats {
     int64_t max_wait_exec_time_ns = 0;
     int64_t total_add_batch_num = 0;
     int64_t num_node_channels = 0;
+    int64_t load_back_pressure_version_time_ms = 0;
     VNodeChannelStat channel_stat;
 };
 
@@ -314,6 +315,8 @@ public:
             writer_stats->total_wait_exec_time_ns +=
                     (_add_batch_counter.add_batch_wait_execution_time_us * 
1000);
             writer_stats->total_add_batch_num += 
_add_batch_counter.add_batch_num;
+            writer_stats->load_back_pressure_version_time_ms +=
+                    _load_back_pressure_version_block_ms;
         }
     }
 
@@ -343,6 +346,10 @@ protected:
                                      const WriteBlockCallbackContext& ctx);
     void _add_block_failed_callback(const WriteBlockCallbackContext& ctx);
 
+    void _refresh_back_pressure_version_wait_time(
+            const 
::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
+                    tablet_load_infos);
+
     VTabletWriter* _parent = nullptr;
     IndexChannel* _index_channel = nullptr;
     int64_t _node_id = -1;
@@ -398,6 +405,7 @@ protected:
     std::atomic<int64_t> _serialize_batch_ns {0};
     std::atomic<int64_t> _queue_push_lock_ns {0};
     std::atomic<int64_t> _actual_consume_ns {0};
+    std::atomic<int64_t> _load_back_pressure_version_block_ms {0};
 
     VNodeChannelStat _stat;
     // lock to protect _is_closed.
@@ -436,6 +444,7 @@ protected:
     bool _is_incremental;
 
     std::atomic<int64_t> _write_bytes {0};
+    std::atomic<int64_t> _load_back_pressure_version_wait_time_ms {0};
 };
 
 // an IndexChannel is related to specific table and its rollup and mv
@@ -735,6 +744,7 @@ private:
     RuntimeProfile::Counter* _max_wait_exec_timer = nullptr;
     RuntimeProfile::Counter* _add_batch_number = nullptr;
     RuntimeProfile::Counter* _num_node_channels = nullptr;
+    RuntimeProfile::Counter* _load_back_pressure_version_time_ms = nullptr;
 
     // the timeout of load channels opened by this tablet sink. in second
     int64_t _load_channel_timeout_s = 0;
diff --git 
a/regression-test/suites/fault_injection_p0/test_load_back_pressure_version.groovy
 
b/regression-test/suites/fault_injection_p0/test_load_back_pressure_version.groovy
new file mode 100644
index 00000000000..48747266306
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_load_back_pressure_version.groovy
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_load_back_pressure_version", "nonConcurrent") {
+    sql """ set enable_memtable_on_sink_node=false """
+    def testTable = "test_load_back_pressure_version"
+    sql """ DROP TABLE IF EXISTS ${testTable}"""
+
+    sql """
+        CREATE TABLE IF NOT EXISTS `${testTable}` (
+          `id` BIGINT NOT NULL,
+          `value` int(11) NOT NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        COMMENT "OLAP"
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        )
+    """
+
+    try {
+        set_be_param("load_back_pressure_version_threshold", "0")
+        sql "insert into ${testTable} values(1,1)"
+        def res = sql "select * from ${testTable}"
+        logger.info("res: " + res.size())
+        assertTrue(res.size() == 1)
+    } finally {
+        set_be_param("load_back_pressure_version_threshold", "80")
+        sql """ set enable_memtable_on_sink_node=true """
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to