This is an automated email from the ASF dual-hosted git repository. jamesge pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git
commit 3b4b0e6a76dc16390ab571a8de8f86ce52b71016 Author: gejun <ge...@bilibili.com> AuthorDate: Tue May 14 16:04:07 2019 +0800 Add IOBufCutter --- src/butil/iobuf.cpp | 215 ++++++++++++++++++++++++++++++++++++-------------- src/butil/iobuf.h | 64 ++++++++++++++- src/butil/iobuf_inl.h | 96 +++++++++++++++++++--- 3 files changed, 300 insertions(+), 75 deletions(-) diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp index 2219ff8..daeaa39 100644 --- a/src/butil/iobuf.cpp +++ b/src/butil/iobuf.cpp @@ -312,12 +312,6 @@ inline IOBuf::Block* create_block() { // release_tls_block_chain() may exceed this limit sometimes. const int MAX_BLOCKS_PER_THREAD = 8; -// NOTE: not see differences in examples when CACHE_IOBUF_BLOCKREFS is turned on -// (tcmalloc linked) -#ifdef CACHE_IOBUF_BLOCKREFS -const int MAX_BLOCKREFS_PER_THREAD = 8; -#endif - struct TLSData { // Head of the TLS block chain. IOBuf::Block* block_head; @@ -327,19 +321,9 @@ struct TLSData { // True if the remote_tls_block_chain is registered to the thread. bool registered; - -#ifdef CACHE_IOBUF_BLOCKREFS - // Reuse array of BlockRef - int num_blockrefs; - IOBuf::BlockRef* blockrefs[MAX_BLOCKREFS_PER_THREAD]; -#endif }; -#ifdef CACHE_IOBUF_BLOCKREFS -static __thread TLSData g_tls_data = { NULL, 0, false, 0, {} }; -#else static __thread TLSData g_tls_data = { NULL, 0, false }; -#endif // Used in UT IOBuf::Block* get_tls_block_head() { return g_tls_data.block_head; } @@ -477,37 +461,16 @@ IOBuf::Block* acquire_tls_block() { return b; } -inline IOBuf::BlockRef* acquire_blockref_array() { -#ifdef CACHE_IOBUF_BLOCKREFS - TLSData& tls_data = g_tls_data; - if (tls_data.num_blockrefs) { - return tls_data.blockrefs[--tls_data.num_blockrefs]; - } -#endif - iobuf::g_newbigview.fetch_add(1, butil::memory_order_relaxed); - return new IOBuf::BlockRef[IOBuf::INITIAL_CAP]; -} - inline IOBuf::BlockRef* acquire_blockref_array(size_t cap) { -#ifdef CACHE_IOBUF_BLOCKREFS - if (cap == IOBuf::INITIAL_CAP) { - return acquire_blockref_array(); - } -#endif iobuf::g_newbigview.fetch_add(1, butil::memory_order_relaxed); return new IOBuf::BlockRef[cap]; } +inline IOBuf::BlockRef* acquire_blockref_array() { + return acquire_blockref_array(IOBuf::INITIAL_CAP); +} + inline void release_blockref_array(IOBuf::BlockRef* refs, size_t cap) { -#ifdef CACHE_IOBUF_BLOCKREFS - if (cap == IOBuf::INITIAL_CAP) { - TLSData& tls_data = g_tls_data; - if (tls_data.num_blockrefs < MAX_BLOCKREFS_PER_THREAD) { - tls_data.blockrefs[tls_data.num_blockrefs++] = refs; - return; - } - } -#endif delete[] refs; } @@ -668,10 +631,13 @@ void IOBuf::_push_or_move_back_ref_to_bigview(const BlockRef& r) { template void IOBuf::_push_or_move_back_ref_to_bigview<true>(const BlockRef&); template void IOBuf::_push_or_move_back_ref_to_bigview<false>(const BlockRef&); -int IOBuf::_pop_front_ref() { +template <bool MOVEOUT> +int IOBuf::_pop_or_moveout_front_ref() { if (_small()) { if (_sv.refs[0].block != NULL) { - _sv.refs[0].block->dec_ref(); + if (!MOVEOUT) { + _sv.refs[0].block->dec_ref(); + } _sv.refs[0] = _sv.refs[1]; reset_block_ref(_sv.refs[1]); return 0; @@ -680,7 +646,9 @@ int IOBuf::_pop_front_ref() { } else { // _bv.nref must be greater than 2 const uint32_t start = _bv.start; - _bv.refs[start].block->dec_ref(); + if (!MOVEOUT) { + _bv.refs[start].block->dec_ref(); + } if (--_bv.nref > 2) { _bv.start = (start + 1) & _bv.cap_mask; _bv.nbytes -= _bv.refs[start].length; @@ -694,6 +662,9 @@ int IOBuf::_pop_front_ref() { return 0; } } +// Explicitly initialize templates. +template int IOBuf::_pop_or_moveout_front_ref<true>(); +template int IOBuf::_pop_or_moveout_front_ref<false>(); int IOBuf::_pop_back_ref() { if (_small()) { @@ -768,12 +739,12 @@ size_t IOBuf::pop_front(size_t n) { return saved_n; } -bool IOBuf::cut1(char* c) { +bool IOBuf::cut1(void* c) { if (empty()) { return false; } IOBuf::BlockRef &r = _front_ref(); - *c = r.block->data[r.offset]; + *(char*)c = r.block->data[r.offset]; if (r.length > 1) { ++r.offset; --r.length; @@ -817,9 +788,9 @@ size_t IOBuf::cutn(IOBuf* out, size_t n) { while (n) { // length() == 0 does not enter IOBuf::BlockRef &r = _front_ref(); if (r.length <= n) { - out->_push_back_ref(r); n -= r.length; - _pop_front_ref(); + out->_move_back_ref(r); + _moveout_front_ref(); } else { const IOBuf::BlockRef cr = { r.offset, (uint32_t)n, r.block }; out->_push_back_ref(cr); @@ -872,8 +843,7 @@ size_t IOBuf::cutn(std::string* out, size_t n) { } const size_t old_size = out->size(); out->resize(out->size() + n); - cutn(&out[0][old_size], n); - return n; + return cutn(&(*out)[old_size], n); } int IOBuf::_cut_by_char(IOBuf* out, char d) { @@ -1412,10 +1382,10 @@ size_t IOBuf::copy_to(void* d, size_t n, size_t pos) const { size_t IOBuf::copy_to(std::string* s, size_t n, size_t pos) const { const size_t len = length(); - if (n + pos > len) { - if (len <= pos) { - return 0; - } + if (len <= pos) { + return 0; + } + if (n > len - pos) { // note: n + pos may overflow n = len - pos; } s->resize(n); @@ -1424,10 +1394,10 @@ size_t IOBuf::copy_to(std::string* s, size_t n, size_t pos) const { size_t IOBuf::append_to(std::string* s, size_t n, size_t pos) const { const size_t len = length(); - if (n + pos > len) { - if (len <= pos) { - return 0; - } + if (len <= pos) { + return 0; + } + if (n > len - pos) { // note: n + pos may overflow n = len - pos; } const size_t old_size = s->size(); @@ -1737,6 +1707,135 @@ void IOPortal::return_cached_blocks_impl(Block* b) { iobuf::release_tls_block_chain(b); } +//////////////// IOBufCutter //////////////// + +IOBufCutter::IOBufCutter(butil::IOBuf* buf) + : _data(NULL) + , _data_end(NULL) + , _block(NULL) + , _buf(buf) { +} + +IOBufCutter::~IOBufCutter() { + if (_block) { + if (_data != _data_end) { + IOBuf::BlockRef& fr = _buf->_front_ref(); + CHECK_EQ(fr.block, _block); + fr.offset = (uint32_t)((char*)_data - _block->data); + fr.length = (uint32_t)((char*)_data_end - (char*)_data); + } else { + _buf->_pop_front_ref(); + } + } +} +bool IOBufCutter::load_next_ref() { + if (_block) { + _buf->_pop_front_ref(); + } + if (!_buf->_ref_num()) { + _data = NULL; + _data_end = NULL; + _block = NULL; + return false; + } else { + const IOBuf::BlockRef& r = _buf->_front_ref(); + _data = r.block->data + r.offset; + _data_end = (char*)_data + r.length; + _block = r.block; + return true; + } +} + +size_t IOBufCutter::slower_copy_to(void* dst, size_t n) { + size_t size = (char*)_data_end - (char*)_data; + if (size == 0) { + if (!load_next_ref()) { + return 0; + } + size = (char*)_data_end - (char*)_data; + if (n <= size) { + memcpy(dst, _data, n); + return n; + } + } + void* const saved_dst = dst; + memcpy(dst, _data, size); + dst = (char*)dst + size; + n -= size; + const size_t nref = _buf->_ref_num(); + for (size_t i = 1; i < nref; ++i) { + const IOBuf::BlockRef& r = _buf->_ref_at(i); + const size_t nc = std::min(n, (size_t)r.length); + memcpy(dst, r.block->data + r.offset, nc); + dst = (char*)dst + nc; + n -= nc; + if (n == 0) { + break; + } + } + return (char*)dst - (char*)saved_dst; +} + +size_t IOBufCutter::cutn(butil::IOBuf* out, size_t n) { + if (n == 0) { + return 0; + } + const size_t size = (char*)_data_end - (char*)_data; + if (n <= size) { + const IOBuf::BlockRef r = { (uint32_t)((char*)_data - _block->data), + (uint32_t)n, + _block }; + out->_push_back_ref(r); + _data = (char*)_data + n; + return n; + } else if (size != 0) { + const IOBuf::BlockRef r = { (uint32_t)((char*)_data - _block->data), + (uint32_t)size, + _block }; + out->_move_back_ref(r); + _buf->_moveout_front_ref(); + _data = NULL; + _data_end = NULL; + _block = NULL; + return _buf->cutn(out, n - size) + size; + } else { + if (_block) { + _data = NULL; + _data_end = NULL; + _block = NULL; + _buf->_pop_front_ref(); + } + return _buf->cutn(out, n); + } +} + +size_t IOBufCutter::cutn(void* out, size_t n) { + if (n == 0) { + return 0; + } + const size_t size = (char*)_data_end - (char*)_data; + if (n <= size) { + memcpy(out, _data, n); + _data = (char*)_data + n; + return n; + } else if (size != 0) { + memcpy(out, _data, size); + _buf->_pop_front_ref(); + _data = NULL; + _data_end = NULL; + _block = NULL; + return _buf->cutn((char*)out + size, n - size) + size; + } else { + if (_block) { + _data = NULL; + _data_end = NULL; + _block = NULL; + _buf->_pop_front_ref(); + } + return _buf->cutn(out, n); + } +} + IOBufAsZeroCopyInputStream::IOBufAsZeroCopyInputStream(const IOBuf& buf) : _ref_index(0) , _add_offset(0) diff --git a/src/butil/iobuf.h b/src/butil/iobuf.h index 1347436..40f015a 100644 --- a/src/butil/iobuf.h +++ b/src/butil/iobuf.h @@ -59,6 +59,7 @@ class IOBuf { friend class IOBufAsZeroCopyInputStream; friend class IOBufAsZeroCopyOutputStream; friend class IOBufBytesIterator; +friend class IOBufCutter; public: static const size_t DEFAULT_BLOCK_SIZE = 8192; static const size_t INITIAL_CAP = 32; // must be power of 2 @@ -128,7 +129,7 @@ public: // Returns bytes popped. size_t pop_back(size_t n); - // Cut off `n' bytes from front side and APPEND to `out' + // Cut off n bytes from front side and APPEND to `out' // If n == 0, nothing cut; if n >= length(), all bytes are cut // Returns bytes cut. size_t cutn(IOBuf* out, size_t n); @@ -136,7 +137,7 @@ public: size_t cutn(std::string* out, size_t n); // Cut off 1 byte from the front side and set to *c // Return true on cut, false otherwise. - bool cut1(char* c); + bool cut1(void* c); // Cut from front side until the characters matches `delim', append // data before the matched characters to `out'. @@ -319,7 +320,8 @@ public: // the internal buffer. // If n == 0 and buffer is empty, return value is undefined. const void* fetch(void* aux_buffer, size_t n) const; - // Just fetch one character. + // Fetch one character from front side. + // Returns pointer to the character, NULL on empty. const void* fetch1() const; // Remove all data @@ -373,7 +375,14 @@ protected: // Pop a BlockRef from front side. // Returns: 0 on success and -1 on empty. - int _pop_front_ref(); + int _pop_front_ref() { return _pop_or_moveout_front_ref<false>(); } + + // Move a BlockRef out from front side. + // Returns: 0 on success and -1 on empty. + int _moveout_front_ref() { return _pop_or_moveout_front_ref<true>(); } + + template <bool MOVEOUT> + int _pop_or_moveout_front_ref(); // Pop a BlockRef from back side. // Returns: 0 on success and -1 on empty. @@ -467,6 +476,51 @@ private: Block* _block; }; +// Specialized utility to cut from IOBuf faster than using corresponding +// methods in IOBuf. +// Designed for efficiently parsing data from IOBuf. +// The cut IOBuf can be appended during cutting. +class IOBufCutter { +public: + explicit IOBufCutter(butil::IOBuf* buf); + ~IOBufCutter(); + + // Cut off n bytes and APPEND to `out' + // Returns bytes cut. + size_t cutn(butil::IOBuf* out, size_t n); + size_t cutn(std::string* out, size_t n); + size_t cutn(void* out, size_t n); + + // Cut off 1 byte from the front side and set to *c + // Return true on cut, false otherwise. + bool cut1(void* data); + + // Copy n bytes into `data' + // Returns bytes copied. + size_t copy_to(void* data, size_t n); + + // Fetch one character. + // Returns pointer to the character, NULL on empty + const void* fetch1(); + + // Pop n bytes from front side + // Returns bytes popped. + size_t pop_front(size_t n); + + // Uncut bytes + size_t remaining_bytes() const; + +private: + size_t slower_copy_to(void* data, size_t n); + bool load_next_ref(); + +private: + void* _data; + void* _data_end; + IOBuf::Block* _block; + IOBuf* _buf; +}; + // Parse protobuf message from IOBuf. Notice that this wrapper does not change // source IOBuf, which also should not change during lifetime of the wrapper. // Even if a IOBufAsZeroCopyInputStream is created but parsed, the source @@ -631,6 +685,8 @@ private: int add_block(); void* _data; + // Saving _data_end instead of _size avoid modifying _data and _size + // in each push_back() which is probably a hotspot. void* _data_end; IOBuf _buf; IOBufAsZeroCopyOutputStream _zc_stream; diff --git a/src/butil/iobuf_inl.h b/src/butil/iobuf_inl.h index 0b3c036..2538f4d 100644 --- a/src/butil/iobuf_inl.h +++ b/src/butil/iobuf_inl.h @@ -189,22 +189,92 @@ inline void IOBuf::_move_back_ref(const BlockRef& r) { } } -inline int IOBufAppender::append(const void* src, size_t n) { - const size_t size = (char*)_data_end - (char*)_data; +//////////////// IOBufCutter //////////////// +inline size_t IOBufCutter::remaining_bytes() const { + if (_block) { + return (char*)_data_end - (char*)_data + _buf->size() - _buf->_front_ref().length; + } else { + return _buf->size(); + } +} + +inline bool IOBufCutter::cut1(void* c) { + if (_data == _data_end) { + if (!load_next_ref()) { + return false; + } + } + *(char*)c = *(const char*)_data; + _data = (char*)_data + 1; + return true; +} + +inline const void* IOBufCutter::fetch1() { + if (_data == _data_end) { + if (!load_next_ref()) { + return NULL; + } + } + return _data; +} + +inline size_t IOBufCutter::copy_to(void* out, size_t n) { + size_t size = (char*)_data_end - (char*)_data; if (n <= size) { - fast_memcpy(_data, src, n); - _data = (char*)_data + n; + memcpy(out, _data, n); + return n; + } + return slower_copy_to(out, n); +} + +inline size_t IOBufCutter::pop_front(size_t n) { + const size_t saved_n = n; + do { + const size_t size = (char*)_data_end - (char*)_data; + if (n <= size) { + _data = (char*)_data + n; + return saved_n; + } + if (size != 0) { + n -= size; + } + if (!load_next_ref()) { + return saved_n; + } + } while (true); +} + +inline size_t IOBufCutter::cutn(std::string* out, size_t n) { + if (n == 0) { return 0; - } - if (size != 0) { - fast_memcpy(_data, src, size); - src = (const char*)src + size; - n -= size; } - if (add_block() != 0) { - return -1; + const size_t len = remaining_bytes(); + if (n > len) { + n = len; } - return append(src, n); // tailr + const size_t old_size = out->size(); + out->resize(out->size() + n); + return cutn(&(*out)[old_size], n); +} + +/////////////// IOBufAppender ///////////////// +inline int IOBufAppender::append(const void* src, size_t n) { + do { + const size_t size = (char*)_data_end - (char*)_data; + if (n <= size) { + memcpy(_data, src, n); + _data = (char*)_data + n; + return 0; + } + if (size != 0) { + memcpy(_data, src, size); + src = (const char*)src + size; + n -= size; + } + if (add_block() != 0) { + return -1; + } + } while (true); } inline int IOBufAppender::append(const StringPiece& str) { @@ -292,7 +362,7 @@ inline size_t IOBufBytesIterator::copy_and_forward(void* buf, size_t n) { while (nc < n && _bytes_left != 0) { const size_t block_size = _block_end - _block_begin; const size_t to_copy = std::min(block_size, n - nc); - fast_memcpy((char*)buf + nc, _block_begin, to_copy); + memcpy((char*)buf + nc, _block_begin, to_copy); _block_begin += to_copy; _bytes_left -= to_copy; nc += to_copy; --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org