This is an automated email from the ASF dual-hosted git repository.

jamesge pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git

commit 3b4b0e6a76dc16390ab571a8de8f86ce52b71016
Author: gejun <ge...@bilibili.com>
AuthorDate: Tue May 14 16:04:07 2019 +0800

    Add IOBufCutter
---
 src/butil/iobuf.cpp   | 215 ++++++++++++++++++++++++++++++++++++--------------
 src/butil/iobuf.h     |  64 ++++++++++++++-
 src/butil/iobuf_inl.h |  96 +++++++++++++++++++---
 3 files changed, 300 insertions(+), 75 deletions(-)

diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp
index 2219ff8..daeaa39 100644
--- a/src/butil/iobuf.cpp
+++ b/src/butil/iobuf.cpp
@@ -312,12 +312,6 @@ inline IOBuf::Block* create_block() {
 // release_tls_block_chain() may exceed this limit sometimes.
 const int MAX_BLOCKS_PER_THREAD = 8;
 
-// NOTE: not see differences in examples when CACHE_IOBUF_BLOCKREFS is turned 
on
-// (tcmalloc linked)
-#ifdef CACHE_IOBUF_BLOCKREFS
-const int MAX_BLOCKREFS_PER_THREAD = 8;
-#endif
-
 struct TLSData {
     // Head of the TLS block chain.
     IOBuf::Block* block_head;
@@ -327,19 +321,9 @@ struct TLSData {
     
     // True if the remote_tls_block_chain is registered to the thread.
     bool registered;
-
-#ifdef CACHE_IOBUF_BLOCKREFS
-    // Reuse array of BlockRef
-    int num_blockrefs;
-    IOBuf::BlockRef* blockrefs[MAX_BLOCKREFS_PER_THREAD];
-#endif
 };
 
-#ifdef CACHE_IOBUF_BLOCKREFS
-static __thread TLSData g_tls_data = { NULL, 0, false, 0, {} };
-#else
 static __thread TLSData g_tls_data = { NULL, 0, false };
-#endif
 
 // Used in UT
 IOBuf::Block* get_tls_block_head() { return g_tls_data.block_head; }
@@ -477,37 +461,16 @@ IOBuf::Block* acquire_tls_block() {
     return b;
 }
 
-inline IOBuf::BlockRef* acquire_blockref_array() {
-#ifdef CACHE_IOBUF_BLOCKREFS
-    TLSData& tls_data = g_tls_data;
-    if (tls_data.num_blockrefs) {
-        return tls_data.blockrefs[--tls_data.num_blockrefs];
-    }
-#endif
-    iobuf::g_newbigview.fetch_add(1, butil::memory_order_relaxed);
-    return new IOBuf::BlockRef[IOBuf::INITIAL_CAP];
-}
-
 inline IOBuf::BlockRef* acquire_blockref_array(size_t cap) {
-#ifdef CACHE_IOBUF_BLOCKREFS
-    if (cap == IOBuf::INITIAL_CAP) {
-        return acquire_blockref_array();
-    }
-#endif
     iobuf::g_newbigview.fetch_add(1, butil::memory_order_relaxed);
     return new IOBuf::BlockRef[cap];
 }
 
+inline IOBuf::BlockRef* acquire_blockref_array() {
+    return acquire_blockref_array(IOBuf::INITIAL_CAP);
+}
+
 inline void release_blockref_array(IOBuf::BlockRef* refs, size_t cap) {
-#ifdef CACHE_IOBUF_BLOCKREFS
-    if (cap == IOBuf::INITIAL_CAP) {
-        TLSData& tls_data = g_tls_data;
-        if (tls_data.num_blockrefs < MAX_BLOCKREFS_PER_THREAD) {
-            tls_data.blockrefs[tls_data.num_blockrefs++] = refs;
-            return;
-        }
-    }
-#endif
     delete[] refs;
 }
 
@@ -668,10 +631,13 @@ void IOBuf::_push_or_move_back_ref_to_bigview(const 
BlockRef& r) {
 template void IOBuf::_push_or_move_back_ref_to_bigview<true>(const BlockRef&);
 template void IOBuf::_push_or_move_back_ref_to_bigview<false>(const BlockRef&);
 
-int IOBuf::_pop_front_ref() {
+template <bool MOVEOUT>
+int IOBuf::_pop_or_moveout_front_ref() {
     if (_small()) {
         if (_sv.refs[0].block != NULL) {
-            _sv.refs[0].block->dec_ref();
+            if (!MOVEOUT) {
+                _sv.refs[0].block->dec_ref();
+            }
             _sv.refs[0] = _sv.refs[1];
             reset_block_ref(_sv.refs[1]);
             return 0;
@@ -680,7 +646,9 @@ int IOBuf::_pop_front_ref() {
     } else {
         // _bv.nref must be greater than 2
         const uint32_t start = _bv.start;
-        _bv.refs[start].block->dec_ref();
+        if (!MOVEOUT) {
+            _bv.refs[start].block->dec_ref();
+        }
         if (--_bv.nref > 2) {
             _bv.start = (start + 1) & _bv.cap_mask;
             _bv.nbytes -= _bv.refs[start].length;
@@ -694,6 +662,9 @@ int IOBuf::_pop_front_ref() {
         return 0;
     }
 }
+// Explicitly initialize templates.
+template int IOBuf::_pop_or_moveout_front_ref<true>();
+template int IOBuf::_pop_or_moveout_front_ref<false>();
 
 int IOBuf::_pop_back_ref() {
     if (_small()) {
@@ -768,12 +739,12 @@ size_t IOBuf::pop_front(size_t n) {
     return saved_n;
 }
 
-bool IOBuf::cut1(char* c) {
+bool IOBuf::cut1(void* c) {
     if (empty()) {
         return false;
     }
     IOBuf::BlockRef &r = _front_ref();
-    *c = r.block->data[r.offset];
+    *(char*)c = r.block->data[r.offset];
     if (r.length > 1) {
         ++r.offset;
         --r.length;
@@ -817,9 +788,9 @@ size_t IOBuf::cutn(IOBuf* out, size_t n) {
     while (n) {   // length() == 0 does not enter
         IOBuf::BlockRef &r = _front_ref();
         if (r.length <= n) {
-            out->_push_back_ref(r);
             n -= r.length;
-            _pop_front_ref();
+            out->_move_back_ref(r);
+            _moveout_front_ref();
         } else {
             const IOBuf::BlockRef cr = { r.offset, (uint32_t)n, r.block };
             out->_push_back_ref(cr);
@@ -872,8 +843,7 @@ size_t IOBuf::cutn(std::string* out, size_t n) {
     }
     const size_t old_size = out->size();
     out->resize(out->size() + n);
-    cutn(&out[0][old_size], n);
-    return n;
+    return cutn(&(*out)[old_size], n);
 }
 
 int IOBuf::_cut_by_char(IOBuf* out, char d) {
@@ -1412,10 +1382,10 @@ size_t IOBuf::copy_to(void* d, size_t n, size_t pos) 
const {
 
 size_t IOBuf::copy_to(std::string* s, size_t n, size_t pos) const {
     const size_t len = length();
-    if (n + pos > len) {
-        if (len <= pos) {
-            return 0;
-        }
+    if (len <= pos) {
+        return 0;
+    }
+    if (n > len - pos) {  // note: n + pos may overflow
         n = len - pos;
     }
     s->resize(n);
@@ -1424,10 +1394,10 @@ size_t IOBuf::copy_to(std::string* s, size_t n, size_t 
pos) const {
 
 size_t IOBuf::append_to(std::string* s, size_t n, size_t pos) const {
     const size_t len = length();
-    if (n + pos > len) {
-        if (len <= pos) {
-            return 0;
-        }
+    if (len <= pos) {
+        return 0;
+    }
+    if (n > len - pos) {  // note: n + pos may overflow
         n = len - pos;
     }
     const size_t old_size = s->size();
@@ -1737,6 +1707,135 @@ void IOPortal::return_cached_blocks_impl(Block* b) {
     iobuf::release_tls_block_chain(b);
 }
 
+//////////////// IOBufCutter ////////////////
+
+IOBufCutter::IOBufCutter(butil::IOBuf* buf)
+    : _data(NULL)
+    , _data_end(NULL)
+    , _block(NULL)
+    , _buf(buf) {
+}
+
+IOBufCutter::~IOBufCutter() {
+    if (_block) {
+        if (_data != _data_end) {
+            IOBuf::BlockRef& fr = _buf->_front_ref();
+            CHECK_EQ(fr.block, _block);
+            fr.offset = (uint32_t)((char*)_data - _block->data);
+            fr.length = (uint32_t)((char*)_data_end - (char*)_data);
+        } else {
+            _buf->_pop_front_ref();
+        }
+    }
+}
+bool IOBufCutter::load_next_ref() {
+    if (_block) {
+        _buf->_pop_front_ref();
+    }
+    if (!_buf->_ref_num()) {
+        _data = NULL;
+        _data_end = NULL;
+        _block = NULL;
+        return false;
+    } else {
+        const IOBuf::BlockRef& r = _buf->_front_ref();
+        _data = r.block->data + r.offset;
+        _data_end = (char*)_data + r.length;
+        _block = r.block;
+        return true;
+    }
+}
+
+size_t IOBufCutter::slower_copy_to(void* dst, size_t n) {
+    size_t size = (char*)_data_end - (char*)_data;
+    if (size == 0) {
+        if (!load_next_ref()) {
+            return 0;
+        }
+        size = (char*)_data_end - (char*)_data;
+        if (n <= size) {
+            memcpy(dst, _data, n);
+            return n;
+        }
+    }
+    void* const saved_dst = dst;
+    memcpy(dst, _data, size);
+    dst = (char*)dst + size;
+    n -= size;
+    const size_t nref = _buf->_ref_num();
+    for (size_t i = 1; i < nref; ++i) {
+        const IOBuf::BlockRef& r = _buf->_ref_at(i);
+        const size_t nc = std::min(n, (size_t)r.length);
+        memcpy(dst, r.block->data + r.offset, nc);
+        dst = (char*)dst + nc;
+        n -= nc;
+        if (n == 0) {
+            break;
+        }
+    }
+    return (char*)dst - (char*)saved_dst;
+}
+
+size_t IOBufCutter::cutn(butil::IOBuf* out, size_t n) {
+    if (n == 0) {
+        return 0;
+    }
+    const size_t size = (char*)_data_end - (char*)_data;
+    if (n <= size) {
+        const IOBuf::BlockRef r = { (uint32_t)((char*)_data - _block->data),
+                                    (uint32_t)n,
+                                    _block };
+        out->_push_back_ref(r);
+        _data = (char*)_data + n;
+        return n;
+    } else if (size != 0) {
+        const IOBuf::BlockRef r = { (uint32_t)((char*)_data - _block->data),
+                                    (uint32_t)size,
+                                    _block };
+        out->_move_back_ref(r);
+        _buf->_moveout_front_ref();
+        _data = NULL;
+        _data_end = NULL;
+        _block = NULL;
+        return _buf->cutn(out, n - size) + size;
+    } else {
+        if (_block) {
+            _data = NULL;
+            _data_end = NULL;
+            _block = NULL;
+            _buf->_pop_front_ref();
+        }
+        return _buf->cutn(out, n);
+    }
+}
+
+size_t IOBufCutter::cutn(void* out, size_t n) {
+    if (n == 0) {
+        return 0;
+    }
+    const size_t size = (char*)_data_end - (char*)_data;
+    if (n <= size) {
+        memcpy(out, _data, n);
+        _data = (char*)_data + n;
+        return n;
+    } else if (size != 0) {
+        memcpy(out, _data, size);
+        _buf->_pop_front_ref();
+        _data = NULL;
+        _data_end = NULL;
+        _block = NULL;
+        return _buf->cutn((char*)out + size, n - size) + size;
+    } else {
+        if (_block) {
+            _data = NULL;
+            _data_end = NULL;
+            _block = NULL;
+            _buf->_pop_front_ref();
+        }
+        return _buf->cutn(out, n);
+    }
+}
+
 IOBufAsZeroCopyInputStream::IOBufAsZeroCopyInputStream(const IOBuf& buf)
     : _ref_index(0)
     , _add_offset(0)
diff --git a/src/butil/iobuf.h b/src/butil/iobuf.h
index 1347436..40f015a 100644
--- a/src/butil/iobuf.h
+++ b/src/butil/iobuf.h
@@ -59,6 +59,7 @@ class IOBuf {
 friend class IOBufAsZeroCopyInputStream;
 friend class IOBufAsZeroCopyOutputStream;
 friend class IOBufBytesIterator;
+friend class IOBufCutter;
 public:
     static const size_t DEFAULT_BLOCK_SIZE = 8192;
     static const size_t INITIAL_CAP = 32; // must be power of 2
@@ -128,7 +129,7 @@ public:
     // Returns bytes popped.
     size_t pop_back(size_t n);
 
-    // Cut off `n' bytes from front side and APPEND to `out'
+    // Cut off n bytes from front side and APPEND to `out'
     // If n == 0, nothing cut; if n >= length(), all bytes are cut
     // Returns bytes cut.
     size_t cutn(IOBuf* out, size_t n);
@@ -136,7 +137,7 @@ public:
     size_t cutn(std::string* out, size_t n);
     // Cut off 1 byte from the front side and set to *c
     // Return true on cut, false otherwise.
-    bool cut1(char* c);
+    bool cut1(void* c);
 
     // Cut from front side until the characters matches `delim', append
     // data before the matched characters to `out'.
@@ -319,7 +320,8 @@ public:
     //                      the internal buffer.
     // If n == 0 and buffer is empty, return value is undefined.
     const void* fetch(void* aux_buffer, size_t n) const;
-    // Just fetch one character.
+    // Fetch one character from front side.
+    // Returns pointer to the character, NULL on empty.
     const void* fetch1() const;
 
     // Remove all data
@@ -373,7 +375,14 @@ protected:
 
     // Pop a BlockRef from front side.
     // Returns: 0 on success and -1 on empty.
-    int _pop_front_ref();
+    int _pop_front_ref() { return _pop_or_moveout_front_ref<false>(); }
+
+    // Move a BlockRef out from front side.
+    // Returns: 0 on success and -1 on empty.
+    int _moveout_front_ref() { return _pop_or_moveout_front_ref<true>(); }
+
+    template <bool MOVEOUT>
+    int _pop_or_moveout_front_ref();
 
     // Pop a BlockRef from back side.
     // Returns: 0 on success and -1 on empty.
@@ -467,6 +476,51 @@ private:
     Block* _block;
 };
 
+// Specialized utility to cut from IOBuf faster than using corresponding
+// methods in IOBuf.
+// Designed for efficiently parsing data from IOBuf.
+// The cut IOBuf can be appended during cutting.
+class IOBufCutter {
+public:
+    explicit IOBufCutter(butil::IOBuf* buf);
+    ~IOBufCutter();
+
+    // Cut off n bytes and APPEND to `out'
+    // Returns bytes cut.
+    size_t cutn(butil::IOBuf* out, size_t n);
+    size_t cutn(std::string* out, size_t n);
+    size_t cutn(void* out, size_t n);
+
+    // Cut off 1 byte from the front side and set to *c
+    // Return true on cut, false otherwise.
+    bool cut1(void* data);
+
+    // Copy n bytes into `data'
+    // Returns bytes copied.
+    size_t copy_to(void* data, size_t n);
+
+    // Fetch one character.
+    // Returns pointer to the character, NULL on empty
+    const void* fetch1();
+
+    // Pop n bytes from front side
+    // Returns bytes popped.
+    size_t pop_front(size_t n);
+
+    // Uncut bytes
+    size_t remaining_bytes() const;
+
+private:
+    size_t slower_copy_to(void* data, size_t n);
+    bool load_next_ref();
+
+private:
+    void* _data;
+    void* _data_end;
+    IOBuf::Block* _block;
+    IOBuf* _buf;
+};
+
 // Parse protobuf message from IOBuf. Notice that this wrapper does not change
 // source IOBuf, which also should not change during lifetime of the wrapper.
 // Even if a IOBufAsZeroCopyInputStream is created but parsed, the source
@@ -631,6 +685,8 @@ private:
     int add_block();
 
     void* _data;
+    // Saving _data_end instead of _size avoid modifying _data and _size
+    // in each push_back() which is probably a hotspot.
     void* _data_end;
     IOBuf _buf;
     IOBufAsZeroCopyOutputStream _zc_stream;
diff --git a/src/butil/iobuf_inl.h b/src/butil/iobuf_inl.h
index 0b3c036..2538f4d 100644
--- a/src/butil/iobuf_inl.h
+++ b/src/butil/iobuf_inl.h
@@ -189,22 +189,92 @@ inline void IOBuf::_move_back_ref(const BlockRef& r) {
     }
 }
 
-inline int IOBufAppender::append(const void* src, size_t n) {
-    const size_t size = (char*)_data_end - (char*)_data;
+////////////////  IOBufCutter ////////////////
+inline size_t IOBufCutter::remaining_bytes() const {
+    if (_block) {
+        return (char*)_data_end - (char*)_data + _buf->size() - 
_buf->_front_ref().length;
+    } else {
+        return _buf->size();
+    }
+}
+
+inline bool IOBufCutter::cut1(void* c) {
+    if (_data == _data_end) {
+        if (!load_next_ref()) {
+            return false;
+        }
+    }
+    *(char*)c = *(const char*)_data;
+    _data = (char*)_data + 1;
+    return true;
+}
+
+inline const void* IOBufCutter::fetch1() {
+    if (_data == _data_end) {
+        if (!load_next_ref()) {
+            return NULL;
+        }
+    }
+    return _data;
+}
+
+inline size_t IOBufCutter::copy_to(void* out, size_t n) {
+    size_t size = (char*)_data_end - (char*)_data;
     if (n <= size) {
-        fast_memcpy(_data, src, n);
-        _data = (char*)_data + n;
+        memcpy(out, _data, n);
+        return n;
+    }
+    return slower_copy_to(out, n);
+}
+
+inline size_t IOBufCutter::pop_front(size_t n) {
+    const size_t saved_n = n;
+    do {
+        const size_t size = (char*)_data_end - (char*)_data;
+        if (n <= size) {
+            _data = (char*)_data + n;
+            return saved_n;
+        }
+        if (size != 0) {
+            n -= size;
+        }
+        if (!load_next_ref()) {
+            return saved_n;
+        }
+    } while (true);
+}
+
+inline size_t IOBufCutter::cutn(std::string* out, size_t n) {
+    if (n == 0) {
         return 0;
-    } 
-    if (size != 0) {
-        fast_memcpy(_data, src, size);
-        src = (const char*)src + size;
-        n -= size;
     }
-    if (add_block() != 0) {
-        return -1;
+    const size_t len = remaining_bytes();
+    if (n > len) {
+        n = len;
     }
-    return append(src, n); // tailr
+    const size_t old_size = out->size();
+    out->resize(out->size() + n);
+    return cutn(&(*out)[old_size], n);
+}
+
+/////////////// IOBufAppender /////////////////
+inline int IOBufAppender::append(const void* src, size_t n) {
+    do {
+        const size_t size = (char*)_data_end - (char*)_data;
+        if (n <= size) {
+            memcpy(_data, src, n);
+            _data = (char*)_data + n;
+            return 0;
+        }
+        if (size != 0) {
+            memcpy(_data, src, size);
+            src = (const char*)src + size;
+            n -= size;
+        }
+        if (add_block() != 0) {
+            return -1;
+        }
+    } while (true);
 }
 
 inline int IOBufAppender::append(const StringPiece& str) {
@@ -292,7 +362,7 @@ inline size_t IOBufBytesIterator::copy_and_forward(void* 
buf, size_t n) {
     while (nc < n && _bytes_left != 0) {
         const size_t block_size = _block_end - _block_begin;
         const size_t to_copy = std::min(block_size, n - nc);
-        fast_memcpy((char*)buf + nc, _block_begin, to_copy);
+        memcpy((char*)buf + nc, _block_begin, to_copy);
         _block_begin += to_copy;
         _bytes_left -= to_copy;
         nc += to_copy;


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to