This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new dbd796c0fb7 [Chore](brpc) add gc for abafreelist to avoid eagain and
set brpc timeout_ms to 2s (#37888)
dbd796c0fb7 is described below
commit dbd796c0fb7edac1801ad9473f9186c7a360c3de
Author: Pxl <[email protected]>
AuthorDate: Fri Jul 19 15:12:39 2024 +0800
[Chore](brpc) add gc for abafreelist to avoid eagain and set brpc
timeout_ms to 2s (#37888)
## Proposed changes
1. add gc for abafreelist to avoid eagain
```
[CANCELLED]failed to send brpc when exchange, error=Resource temporarily
unavailable, error_text=[E11]Resource temporarily unavailable
```
2. set brpc timeout_ms to 2s
---
be/src/util/brpc_client_cache.h | 1 +
....9.0-add-gc-on-abalist-and-adjust-gc-size.patch | 280 +++++++++++++++++++++
2 files changed, 281 insertions(+)
diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h
index 290f2cc3e04..ebef80f4a6b 100644
--- a/be/src/util/brpc_client_cache.h
+++ b/be/src/util/brpc_client_cache.h
@@ -126,6 +126,7 @@ public:
options.connection_group = connection_group;
}
options.connect_timeout_ms = 2000;
+ options.timeout_ms = 2000;
options.max_retry = 10;
std::unique_ptr<brpc::Channel> channel(new brpc::Channel());
diff --git
a/thirdparty/patches/brpc-1.9.0-add-gc-on-abalist-and-adjust-gc-size.patch
b/thirdparty/patches/brpc-1.9.0-add-gc-on-abalist-and-adjust-gc-size.patch
new file mode 100644
index 00000000000..7287b4a536d
--- /dev/null
+++ b/thirdparty/patches/brpc-1.9.0-add-gc-on-abalist-and-adjust-gc-size.patch
@@ -0,0 +1,280 @@
+From fb53ab9245e8570d44a2eeea2ab536841a7876ec Mon Sep 17 00:00:00 2001
+From: BiteTheDDDDt <[email protected]>
+Date: Tue, 16 Jul 2024 14:21:21 +0800
+Subject: [PATCH] add gc on abalist and adjust gc size
+
+---
+ src/bthread/bthread.cpp | 3 +-
+ src/bthread/id.cpp | 1 +
+ src/bthread/list_of_abafree_id.h | 161 ++++++++++++++++++++++++++++---
+ 3 files changed, 151 insertions(+), 14 deletions(-)
+
+diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp
+index 5ac0c3b1..86fd2c90 100644
+--- a/src/bthread/bthread.cpp
++++ b/src/bthread/bthread.cpp
+@@ -147,7 +147,8 @@ start_from_non_worker(bthread_t* __restrict tid,
+
+ struct TidTraits {
+ static const size_t BLOCK_SIZE = 63;
+- static const size_t MAX_ENTRIES = 65536;
++ static const size_t MAX_ENTRIES = 655360;
++ static const size_t INIT_GC_SIZE = 65536;
+ static const bthread_t ID_INIT;
+ static bool exists(bthread_t id) { return bthread::TaskGroup::exists(id);
}
+ };
+diff --git a/src/bthread/id.cpp b/src/bthread/id.cpp
+index 41c49a3f..ba77580a 100644
+--- a/src/bthread/id.cpp
++++ b/src/bthread/id.cpp
+@@ -291,6 +291,7 @@ void id_pool_status(std::ostream &os) {
+ struct IdTraits {
+ static const size_t BLOCK_SIZE = 63;
+ static const size_t MAX_ENTRIES = 100000;
++ static const size_t INIT_GC_SIZE = 4096;
+ static const bthread_id_t ID_INIT;
+ static bool exists(bthread_id_t id)
+ { return bthread::id_exists_with_true_negatives(id); }
+diff --git a/src/bthread/list_of_abafree_id.h
b/src/bthread/list_of_abafree_id.h
+index ac2b2234..45043acc 100644
+--- a/src/bthread/list_of_abafree_id.h
++++ b/src/bthread/list_of_abafree_id.h
+@@ -22,8 +22,10 @@
+ #ifndef BTHREAD_LIST_OF_ABAFREE_ID_H
+ #define BTHREAD_LIST_OF_ABAFREE_ID_H
+
+-#include <vector>
+ #include <deque>
++#include <vector>
++
++#include "butil/macros.h"
+
+ namespace bthread {
+
+@@ -48,6 +50,9 @@ namespace bthread {
+ // // Max #entries. Often has close relationship with concurrency, 65536
+ // // is "huge" for most apps.
+ // static const size_t MAX_ENTRIES = 65536;
++// // Initial GC length, when the number of blocks reaches this length,
++// // start to initiate list GC operation. It will release useless blocks
++// static const size_t INIT_GC_SIZE = 4096;
+ //
+ // // Initial value of id. Id with the value is treated as invalid.
+ // static const Id ID_INIT = ...;
+@@ -64,13 +69,14 @@ class ListOfABAFreeId {
+ public:
+ ListOfABAFreeId();
+ ~ListOfABAFreeId();
+-
++
+ // Add an identifier into the list.
+ int add(Id id);
+-
++
+ // Apply fn(id) to all identifiers.
+- template <typename Fn> void apply(const Fn& fn);
+-
++ template <typename Fn>
++ void apply(const Fn& fn);
++
+ // Put #entries of each level into `counts'
+ // Returns #levels.
+ size_t get_sizes(size_t* counts, size_t n);
+@@ -82,19 +88,31 @@ private:
+ IdBlock* next;
+ };
+ void forward_index();
++
++ struct TempIdBlock {
++ IdBlock* block;
++ uint32_t index;
++ uint32_t nblock;
++ };
++
++ int gc();
++ int add_to_temp_list(TempIdBlock* temp_list, Id id);
++ template <typename Fn>
++ int for_each(const Fn& fn);
++ void free_list(IdBlock* block);
++
+ IdBlock* _cur_block;
+ uint32_t _cur_index;
+ uint32_t _nblock;
+ IdBlock _head_block;
++ uint32_t _next_gc_size;
+ };
+
+ // [impl.]
+
+ template <typename Id, typename IdTraits>
+ ListOfABAFreeId<Id, IdTraits>::ListOfABAFreeId()
+- : _cur_block(&_head_block)
+- , _cur_index(0)
+- , _nblock(1) {
++ : _cur_block(&_head_block), _cur_index(0), _nblock(1),
_next_gc_size(IdTraits::INIT_GC_SIZE) {
+ for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
+ _head_block.ids[i] = IdTraits::ID_INIT;
+ }
+@@ -140,6 +158,30 @@ int ListOfABAFreeId<Id, IdTraits>::add(Id id) {
+ }
+ saved_pos[i] = pos;
+ }
++ // If we don't expect a GC to occur in abalist, then an error is reported
and EAGAIN is returned.
++ if (_nblock * IdTraits::BLOCK_SIZE > IdTraits::MAX_ENTRIES) {
++ return EAGAIN;
++ }
++ // If the number of blocks exceeds the minimum GC length, start the GC
operation
++ if (_nblock * IdTraits::BLOCK_SIZE > _next_gc_size) {
++ uint32_t before_gc_blocks = _nblock;
++ int rc = gc();
++ // To avoid frequent GC operations, we only let the GC be effective
enough to continue the GC.
++ // otherwise we let the next GC occur length * 2.
++ //
++ // Condition for a GC to be sufficiently efficient: the number of
blocks
++ // retained after the GC is 1/4 of the previous one.
++ if ((before_gc_blocks - _nblock) * IdTraits::BLOCK_SIZE <
(_next_gc_size - (_next_gc_size >> 2))) {
++ _next_gc_size <<= 1;
++ // We want to make sure that GC must occur before MAX_ENTRIES.
++ static_assert(IdTraits::MAX_ENTRIES > IdTraits::BLOCK_SIZE * 2,
"MAX_ENTRIES should be greater than 2 * IdTraits::BLOCK_SIZE");
++ if (_next_gc_size >= IdTraits::MAX_ENTRIES) {
++ _next_gc_size = IdTraits::MAX_ENTRIES - IdTraits::BLOCK_SIZE
* 2;
++ }
++ }
++
++ return rc;
++ }
+ // The list is considered to be "crowded", add a new block and scatter
+ // the conflict identifiers by inserting an empty entry after each of
+ // them, so that even if the identifiers are still valid when we walk
+@@ -152,9 +194,6 @@ int ListOfABAFreeId<Id, IdTraits>::add(Id id) {
+ //
+ // [..xxxx....] -> [......yyyy] -> [..........]
+ // block A new block block B
+- if (_nblock * IdTraits::BLOCK_SIZE > IdTraits::MAX_ENTRIES) {
+- return EAGAIN;
+- }
+ IdBlock* new_block = new (std::nothrow) IdBlock;
+ if (NULL == new_block) {
+ return ENOMEM;
+@@ -188,6 +227,93 @@ int ListOfABAFreeId<Id, IdTraits>::add(Id id) {
+ return 0;
+ }
+
++template <typename Id, typename IdTraits>
++int ListOfABAFreeId<Id, IdTraits>::gc() {
++ IdBlock* new_block = new (std::nothrow) IdBlock;
++ if (NULL == new_block) {
++ return ENOMEM;
++ }
++ // reset head block
++ for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
++ new_block->ids[i] = IdTraits::ID_INIT;
++ }
++ new_block->next = NULL;
++
++ TempIdBlock tmp_id_block;
++ tmp_id_block.block = new_block;
++ tmp_id_block.nblock = 1;
++ tmp_id_block.index = 0;
++
++ // Add each element of the old list to the new list
++ int rc = for_each([&](Id id) {
++ int rc;
++ rc = add_to_temp_list(&tmp_id_block, id);
++ if (rc != 0) {
++ return rc;
++ }
++ rc = add_to_temp_list(&tmp_id_block, IdTraits::ID_INIT);
++ if (rc != 0) {
++ return rc;
++ }
++ return 0;
++ });
++
++ if (rc != 0) {
++ free_list(new_block);
++ return rc;
++ }
++
++ // reset head block
++ for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
++ _head_block.ids[i] = IdTraits::ID_INIT;
++ }
++
++ free_list(_head_block.next);
++ _cur_block = tmp_id_block.block;
++ _cur_index = tmp_id_block.index;
++ // nblock and head_block
++ _nblock = tmp_id_block.nblock + 1;
++ _head_block.next = new_block;
++
++ return 0;
++}
++
++template <typename Id, typename IdTraits>
++int ListOfABAFreeId<Id, IdTraits>::add_to_temp_list(TempIdBlock* block_list,
Id id) {
++ block_list->block->ids[block_list->index++] = id;
++ // add new list
++ if (block_list->index == IdTraits::BLOCK_SIZE) {
++ block_list->index = 0;
++ block_list->nblock++;
++ block_list->block->next = new (std::nothrow) IdBlock;
++ if (NULL == block_list->block->next) {
++ return ENOMEM;
++ }
++ block_list->block = block_list->block->next;
++ for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
++ block_list->block->ids[i] = IdTraits::ID_INIT;
++ }
++ block_list->block->next = NULL;
++ }
++ return 0;
++}
++
++template <typename Id, typename IdTraits>
++template <typename Fn>
++int ListOfABAFreeId<Id, IdTraits>::for_each(const Fn& fn) {
++ for (IdBlock* p = &_head_block; p != NULL; p = p->next) {
++ for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
++ if (p->ids[i] != IdTraits::ID_INIT &&
IdTraits::exists(p->ids[i])) {
++ int rc = fn(p->ids[i]);
++ if (rc != 0) {
++ return rc;
++ }
++ }
++ }
++ }
++ return 0;
++}
++
+ template <typename Id, typename IdTraits>
+ template <typename Fn>
+ void ListOfABAFreeId<Id, IdTraits>::apply(const Fn& fn) {
+@@ -200,6 +326,15 @@ void ListOfABAFreeId<Id, IdTraits>::apply(const Fn& fn) {
+ }
+ }
+
++template <typename Id, typename IdTraits>
++void ListOfABAFreeId<Id, IdTraits>::free_list(IdBlock* p) {
++ for (; p != NULL;) {
++ IdBlock* saved_next = p->next;
++ delete p;
++ p = saved_next;
++ }
++}
++
+ template <typename Id, typename IdTraits>
+ size_t ListOfABAFreeId<Id, IdTraits>::get_sizes(size_t* cnts, size_t n) {
+ if (n == 0) {
+@@ -210,6 +345,6 @@ size_t ListOfABAFreeId<Id, IdTraits>::get_sizes(size_t*
cnts, size_t n) {
+ return 1;
+ }
+
+-} // namespace bthread
++} // namespace bthread
+
+-#endif // BTHREAD_LIST_OF_ABAFREE_ID_H
++#endif // BTHREAD_LIST_OF_ABAFREE_ID_H
+--
+2.31.1
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]