This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 58f171035fa branch-4.0: [feat](storage) introduce backpressure
algorithm to control version number (part I) #57133 (#57416)
58f171035fa is described below
commit 58f171035fad18913cae0ca4fc35b814778e0bda
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Nov 24 12:45:22 2025 +0800
branch-4.0: [feat](storage) introduce backpressure algorithm to control
version number (part I) #57133 (#57416)
Cherry-picked from #57133
Co-authored-by: hui lai <[email protected]>
---
be/src/common/config.cpp | 5 ++
be/src/common/config.h | 5 ++
be/src/olap/delta_writer.cpp | 16 ++++++
be/src/olap/delta_writer.h | 3 ++
be/src/olap/rowset_builder.h | 5 +-
be/src/runtime/tablets_channel.cpp | 6 +++
be/src/runtime/tablets_channel.h | 1 +
be/src/vec/sink/writer/vtablet_writer.cpp | 57 ++++++++++++++++++++++
be/src/vec/sink/writer/vtablet_writer.h | 10 ++++
.../test_load_back_pressure_version.groovy | 46 +++++++++++++++++
10 files changed, 152 insertions(+), 2 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5e7871814fb..6b2175f1a2e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -852,6 +852,11 @@ DEFINE_mInt32(max_tablet_version_num, "2000");
DEFINE_mInt32(time_series_max_tablet_version_num, "20000");
+// the max sleep time when meeting high pressure load task
+DEFINE_mInt64(max_load_back_pressure_version_wait_time_ms, "3000");
+// the threshold of rowset number gap that triggers back pressure
+DEFINE_mInt64(load_back_pressure_version_threshold, "80"); // 80%
+
// Frontend mainly use two thrift sever type: THREAD_POOL, THREADED_SELECTOR.
if fe use THREADED_SELECTOR model for thrift server,
// the thrift_server_type_of_fe should be set THREADED_SELECTOR to make be
thrift client to fe constructed with TFramedTransport
DEFINE_String(thrift_server_type_of_fe, "THREAD_POOL");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4d3b1a73420..72602834d54 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -877,6 +877,11 @@ DECLARE_mInt32(max_tablet_version_num);
DECLARE_mInt32(time_series_max_tablet_version_num);
+// the max sleep time when meeting high pressure load task
+DECLARE_mInt64(max_load_back_pressure_version_wait_time_ms);
+// the threshold of rowset number gap that triggers back pressure
+DECLARE_mInt64(load_back_pressure_version_threshold);
+
// Frontend mainly use two thrift sever type: THREAD_POOL, THREADED_SELECTOR.
if fe use THREADED_SELECTOR model for thrift server,
// the thrift_server_type_of_fe should be set THREADED_SELECTOR to make be
thrift client to fe constructed with TFramedTransport
DECLARE_String(thrift_server_type_of_fe);
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index b96776c19b3..2098711a9e0 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -98,6 +98,22 @@ BaseDeltaWriter::~BaseDeltaWriter() {
}
}
+void BaseDeltaWriter::set_tablet_load_rowset_num_info(
+ google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>*
tablet_infos) {
+ auto* tablet = _rowset_builder->tablet().get();
+ if (tablet == nullptr) {
+ return;
+ }
+ auto max_version_config = tablet->max_version_config();
+ if (auto version_cnt = tablet->tablet_meta()->version_count();
+ UNLIKELY(version_cnt >
+ (max_version_config *
config::load_back_pressure_version_threshold / 100))) {
+ auto* load_info = tablet_infos->Add();
+ load_info->set_current_rowset_nums(static_cast<int32_t>(version_cnt));
+ load_info->set_max_config_rowset_nums(max_version_config);
+ }
+}
+
DeltaWriter::~DeltaWriter() = default;
Status BaseDeltaWriter::init() {
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index f840e4f8aa1..1795d834556 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -93,6 +93,9 @@ public:
int64_t num_rows_filtered() const;
+ void set_tablet_load_rowset_num_info(
+ google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>*
tablet_info);
+
protected:
virtual void _init_profile(RuntimeProfile* profile);
diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h
index 8da8fe8ad60..99797354347 100644
--- a/be/src/olap/rowset_builder.h
+++ b/be/src/olap/rowset_builder.h
@@ -126,6 +126,9 @@ public:
Status commit_txn();
+ // Cast `BaseTablet` to `Tablet`
+ Tablet* tablet();
+
private:
void _init_profile(RuntimeProfile* profile) override;
@@ -135,8 +138,6 @@ private:
void _garbage_collection();
- // Cast `BaseTablet` to `Tablet`
- Tablet* tablet();
TabletSharedPtr tablet_sptr();
StorageEngine& _engine;
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 460879978cd..1bb7de0c19f 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -614,10 +614,16 @@ Status BaseTabletsChannel::_write_block_data(
};
SCOPED_TIMER(_write_block_timer);
+ auto* tablet_load_infos = response->mutable_tablet_load_rowset_num_infos();
for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first,
[&](BaseDeltaWriter* writer) {
return writer->write(&send_data, tablet_to_rowidxs_it.second);
}));
+
+ auto tablet_writer_it =
_tablet_writers.find(tablet_to_rowidxs_it.first);
+ if (tablet_writer_it != _tablet_writers.end()) {
+
tablet_writer_it->second->set_tablet_load_rowset_num_info(tablet_load_infos);
+ }
}
{
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index a8e2cc96f01..46231bc75e6 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -50,6 +50,7 @@ class PSuccessSlaveTabletNodeIds;
class PTabletError;
class PTabletInfo;
class PTabletWriterOpenRequest;
+class PTabletWriterOpenResult;
class PTabletWriterAddBlockRequest;
class PTabletWriterAddBlockResult;
class PUniqueId;
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index fe51df4dcb2..fb864435cdf 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -45,6 +45,7 @@
#include <vector>
#include "cloud/config.h"
+#include "common/config.h"
#include "cpp/sync_point.h"
#include "util/runtime_profile.h"
#include "vec/data_types/data_type.h"
@@ -100,6 +101,8 @@ bvar::PerSecond<bvar::Adder<int64_t>>
g_sink_write_bytes_per_second("sink_throug
bvar::Adder<int64_t> g_sink_write_rows;
bvar::PerSecond<bvar::Adder<int64_t>>
g_sink_write_rows_per_second("sink_throughput_row",
&g_sink_write_rows, 60);
+bvar::Adder<int64_t> g_sink_load_back_pressure_version_time_ms(
+ "load_back_pressure_version_time_ms");
Status IndexChannel::init(RuntimeState* state, const
std::vector<TTabletWithPartition>& tablets,
bool incremental) {
@@ -806,6 +809,15 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState*
state,
return 0;
}
+ auto load_back_pressure_version_wait_time_ms =
_load_back_pressure_version_wait_time_ms.load();
+ if (UNLIKELY(load_back_pressure_version_wait_time_ms > 0)) {
+ std::this_thread::sleep_for(
+
std::chrono::milliseconds(load_back_pressure_version_wait_time_ms));
+ _load_back_pressure_version_block_ms.fetch_add(
+ load_back_pressure_version_wait_time_ms); // already in
milliseconds
+ _load_back_pressure_version_wait_time_ms = 0;
+ }
+
// set closure for sending block.
if (!_send_block_callback->try_set_in_flight()) {
// There is packet in flight, skip.
@@ -839,6 +851,45 @@ void VNodeChannel::_cancel_with_msg(const std::string&
msg) {
_cancelled = true;
}
+void VNodeChannel::_refresh_back_pressure_version_wait_time(
+ const
::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
+ tablet_load_infos) {
+ int64_t max_rowset_num_gap = 0;
+ // if any one tablet is under high load pressure, we would make the whole
procedure
+ // sleep to prevent the corresponding BE return -235
+ std::for_each(
+ tablet_load_infos.begin(), tablet_load_infos.end(),
+ [&max_rowset_num_gap](auto& load_info) {
+ int64_t cur_rowset_num = load_info.current_rowset_nums();
+ int64_t high_load_point = load_info.max_config_rowset_nums() *
+
(config::load_back_pressure_version_threshold / 100);
+ DCHECK(cur_rowset_num > high_load_point);
+ max_rowset_num_gap = std::max(max_rowset_num_gap,
cur_rowset_num - high_load_point);
+ });
+ // to slow down the high load pressure
+ // we would use the rowset num gap to calculate one sleep time
+ // for example:
+ // if the max tablet version is 2000, there are 3 BE
+ // A: ==================== 1800
+ // B: =================== 1700
+ // C: ================== 1600
+ // ================== 1600
+ // ^
+ // the high load point
+ // then then max gap is 1800 - (max tablet version *
config::load_back_pressure_version_threshold / 100) = 200,
+ // we would make the whole send procesure sleep
+ // 1200ms for compaction to be done toe reduce the high pressure
+ auto max_time = config::max_load_back_pressure_version_wait_time_ms;
+ if (UNLIKELY(max_rowset_num_gap > 0)) {
+ _load_back_pressure_version_wait_time_ms.store(
+ std::min(max_rowset_num_gap + 1000, max_time));
+ LOG(INFO) << "try to back pressure version, wait time(ms): "
+ << _load_back_pressure_version_wait_time_ms
+ << ", load id: " << print_id(_parent->_load_id)
+ << ", max_rowset_num_gap: " << max_rowset_num_gap;
+ }
+}
+
void VNodeChannel::try_send_pending_block(RuntimeState* state) {
SCOPED_ATTACH_TASK(state);
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker);
@@ -1007,6 +1058,7 @@ void VNodeChannel::_add_block_success_callback(const
PTabletWriterAddBlockResult
SCOPED_ATTACH_TASK(_state);
Status status(Status::create(result.status()));
if (status.ok()) {
+
_refresh_back_pressure_version_wait_time(result.tablet_load_rowset_num_infos());
// if has error tablet, handle them first
for (const auto& error : result.tablet_errors()) {
_index_channel->mark_as_failed(this, "tablet error: " +
error.msg(), error.tablet_id());
@@ -1535,6 +1587,7 @@ Status VTabletWriter::_init(RuntimeState* state,
RuntimeProfile* profile) {
_max_wait_exec_timer = ADD_TIMER(profile, "MaxWaitExecTime");
_add_batch_number = ADD_COUNTER(profile, "NumberBatchAdded", TUnit::UNIT);
_num_node_channels = ADD_COUNTER(profile, "NumberNodeChannels",
TUnit::UNIT);
+ _load_back_pressure_version_time_ms = ADD_TIMER(profile,
"LoadBackPressureVersionTimeMs");
#ifdef DEBUG
// check: tablet ids should be unique
@@ -1869,6 +1922,10 @@ Status VTabletWriter::close(Status exec_status) {
COUNTER_SET(_max_wait_exec_timer,
writer_stats.max_wait_exec_time_ns);
COUNTER_SET(_add_batch_number, writer_stats.total_add_batch_num);
COUNTER_SET(_num_node_channels, writer_stats.num_node_channels);
+ COUNTER_SET(_load_back_pressure_version_time_ms,
+ writer_stats.load_back_pressure_version_time_ms);
+ g_sink_load_back_pressure_version_time_ms
+ << writer_stats.load_back_pressure_version_time_ms;
// _number_input_rows don't contain num_rows_load_filtered and
num_rows_load_unselected in scan node
int64_t num_rows_load_total = _number_input_rows +
_state->num_rows_load_filtered() +
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index 3e0dd298fb0..ba1c0c505d6 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -218,6 +218,7 @@ struct WriterStats {
int64_t max_wait_exec_time_ns = 0;
int64_t total_add_batch_num = 0;
int64_t num_node_channels = 0;
+ int64_t load_back_pressure_version_time_ms = 0;
VNodeChannelStat channel_stat;
};
@@ -314,6 +315,8 @@ public:
writer_stats->total_wait_exec_time_ns +=
(_add_batch_counter.add_batch_wait_execution_time_us *
1000);
writer_stats->total_add_batch_num +=
_add_batch_counter.add_batch_num;
+ writer_stats->load_back_pressure_version_time_ms +=
+ _load_back_pressure_version_block_ms;
}
}
@@ -343,6 +346,10 @@ protected:
const WriteBlockCallbackContext& ctx);
void _add_block_failed_callback(const WriteBlockCallbackContext& ctx);
+ void _refresh_back_pressure_version_wait_time(
+ const
::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
+ tablet_load_infos);
+
VTabletWriter* _parent = nullptr;
IndexChannel* _index_channel = nullptr;
int64_t _node_id = -1;
@@ -398,6 +405,7 @@ protected:
std::atomic<int64_t> _serialize_batch_ns {0};
std::atomic<int64_t> _queue_push_lock_ns {0};
std::atomic<int64_t> _actual_consume_ns {0};
+ std::atomic<int64_t> _load_back_pressure_version_block_ms {0};
VNodeChannelStat _stat;
// lock to protect _is_closed.
@@ -436,6 +444,7 @@ protected:
bool _is_incremental;
std::atomic<int64_t> _write_bytes {0};
+ std::atomic<int64_t> _load_back_pressure_version_wait_time_ms {0};
};
// an IndexChannel is related to specific table and its rollup and mv
@@ -735,6 +744,7 @@ private:
RuntimeProfile::Counter* _max_wait_exec_timer = nullptr;
RuntimeProfile::Counter* _add_batch_number = nullptr;
RuntimeProfile::Counter* _num_node_channels = nullptr;
+ RuntimeProfile::Counter* _load_back_pressure_version_time_ms = nullptr;
// the timeout of load channels opened by this tablet sink. in second
int64_t _load_channel_timeout_s = 0;
diff --git
a/regression-test/suites/fault_injection_p0/test_load_back_pressure_version.groovy
b/regression-test/suites/fault_injection_p0/test_load_back_pressure_version.groovy
new file mode 100644
index 00000000000..48747266306
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_load_back_pressure_version.groovy
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_load_back_pressure_version", "nonConcurrent") {
+ sql """ set enable_memtable_on_sink_node=false """
+ def testTable = "test_load_back_pressure_version"
+ sql """ DROP TABLE IF EXISTS ${testTable}"""
+
+ sql """
+ CREATE TABLE IF NOT EXISTS `${testTable}` (
+ `id` BIGINT NOT NULL,
+ `value` int(11) NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ )
+ """
+
+ try {
+ set_be_param("load_back_pressure_version_threshold", "0")
+ sql "insert into ${testTable} values(1,1)"
+ def res = sql "select * from ${testTable}"
+ logger.info("res: " + res.size())
+ assertTrue(res.size() == 1)
+ } finally {
+ set_be_param("load_back_pressure_version_threshold", "80")
+ sql """ set enable_memtable_on_sink_node=true """
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]