This is an automated email from the ASF dual-hosted git repository. wwbmmm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new 3eb40413 iobuf support reserve_aligned (#2942) 3eb40413 is described below commit 3eb40413b2e3b4747a101dae5d7dcaba9b48a324 Author: Yang,Liming <liming.y...@139.com> AuthorDate: Thu Apr 24 15:37:26 2025 +0800 iobuf support reserve_aligned (#2942) --- src/butil/iobuf.cpp | 56 ++++++++++++++++++++++++++ src/butil/iobuf.h | 11 +++++ test/iobuf_unittest.cpp | 105 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 172 insertions(+) diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp index 8895fb16..59ad61a2 100644 --- a/src/butil/iobuf.cpp +++ b/src/butil/iobuf.cpp @@ -355,6 +355,23 @@ inline IOBuf::Block* create_block() { return create_block(IOBuf::DEFAULT_BLOCK_SIZE); } +inline IOBuf::Block* create_block_aligned(size_t block_size, size_t alignment) { + if (block_size > 0xFFFFFFFFULL) { + LOG(FATAL) << "block_size=" << block_size << " is too large"; + return NULL; + } + char* mem = (char*)iobuf::blockmem_allocate(block_size); + if (mem == NULL) { + return NULL; + } + char* data = mem + sizeof(IOBuf::Block); + // change data pointer & data size make align satisfied + size_t adder = (-reinterpret_cast<uintptr_t>(data)) & (alignment - 1); + size_t size = + (block_size - sizeof(IOBuf::Block) - adder) & ~(alignment - 1); + return new (mem) IOBuf::Block(data + adder, size); +} + // === Share TLS blocks between appending operations === // Max number of blocks in each TLS. This is a soft limit namely // release_tls_block_chain() may exceed this limit sometimes. @@ -1785,6 +1802,45 @@ void IOPortal::return_cached_blocks_impl(Block* b) { iobuf::release_tls_block_chain(b); } +IOBuf::Area IOReserveAlignedBuf::reserve(size_t count) { + IOBuf::Area result = INVALID_AREA; + if (_reserved == true) { + LOG(ERROR) << "Already call reserved"; + return result; + } + _reserved = true; + bool is_power_two = _alignment > 0 && (_alignment & (_alignment - 1)); + if (is_power_two != 0) { + LOG(ERROR) << "Invalid alignment, must power of two"; + return INVALID_AREA; + } + count = (count + _alignment - 1) & ~(_alignment - 1); + size_t total_nc = 0; + while (total_nc < count) { + const auto block_size = + std::max(_alignment, 4096UL) * 2 + sizeof(IOBuf::Block); + auto b = iobuf::create_block_aligned(block_size, _alignment); + if (BAIDU_UNLIKELY(!b)) { + LOG(ERROR) << "Create block failed"; + return result; + } + const size_t nc = std::min(count - total_nc, b->left_space()); + const IOBuf::BlockRef r = {(uint32_t)b->size, (uint32_t)nc, b}; + _push_back_ref(r); + // aligned block is not from tls, release block ref + b->dec_ref(); + if (total_nc == 0) { + // Encode the area at first time. Notice that the pushed ref may + // be merged with existing ones. + result = make_area(_ref_num() - 1, _back_ref().length - nc, count); + } + // add total nc + total_nc += nc; + b->size += nc; + }; + return result; +} + //////////////// IOBufCutter //////////////// IOBufCutter::IOBufCutter(butil::IOBuf* buf) diff --git a/src/butil/iobuf.h b/src/butil/iobuf.h index 978f9758..3682f61e 100644 --- a/src/butil/iobuf.h +++ b/src/butil/iobuf.h @@ -489,6 +489,17 @@ private: Block* _block; }; +class IOReserveAlignedBuf : public IOBuf { +public: + IOReserveAlignedBuf(size_t alignment) + : _alignment(alignment), _reserved(false) {} + Area reserve(size_t count); + +private: + size_t _alignment; + bool _reserved; +}; + // Specialized utility to cut from IOBuf faster than using corresponding // methods in IOBuf. // Designed for efficiently parsing data from IOBuf. diff --git a/test/iobuf_unittest.cpp b/test/iobuf_unittest.cpp index b45a174c..a919cbda 100644 --- a/test/iobuf_unittest.cpp +++ b/test/iobuf_unittest.cpp @@ -22,6 +22,7 @@ #include <fcntl.h> // O_RDONLY #include <stdlib.h> #include <memory> +#include <cstring> #include <butil/files/temp_file.h> // TempFile #include <butil/containers/flat_map.h> #include <butil/macros.h> @@ -1785,4 +1786,108 @@ TEST_F(IOBufTest, acquire_tls_block) { ASSERT_NE(butil::iobuf::block_cap(b), butil::iobuf::block_size(b)); } +TEST_F(IOBufTest, reserve_aligned) { + { + butil::IOReserveAlignedBuf buf(16); + auto area = buf.reserve(1024); + ASSERT_NE(area, butil::IOBuf::INVALID_AREA); + butil::IOBufAsZeroCopyInputStream wrapper(buf); + const void* data; + int size; + int total_size = 0; + while (wrapper.Next(&data, &size)) { + ASSERT_EQ(reinterpret_cast<uintptr_t>(data) % 16, 0); + ASSERT_EQ(size % 16, 0); + total_size += size; + } + ASSERT_EQ(total_size, 1024); + } + { + butil::IOReserveAlignedBuf buf(4096); + auto area = buf.reserve(1024); + ASSERT_NE(area, butil::IOBuf::INVALID_AREA); + butil::IOBufAsZeroCopyInputStream wrapper(buf); + const void* data; + int size; + int total_size = 0; + while (wrapper.Next(&data, &size)) { + ASSERT_EQ(reinterpret_cast<uintptr_t>(data) % 4096, 0); + ASSERT_EQ(size % 4096, 0); + total_size += size; + } + ASSERT_EQ(total_size, 4096); + } + { + butil::IOReserveAlignedBuf buf(4096); + auto area = buf.reserve(8191); + ASSERT_NE(area, butil::IOBuf::INVALID_AREA); + butil::IOBufAsZeroCopyInputStream wrapper(buf); + const void* data; + int size; + int total_size = 0; + while (wrapper.Next(&data, &size)) { + ASSERT_EQ(reinterpret_cast<uintptr_t>(data) % 4096, 0); + ASSERT_EQ(size % 4096, 0); + total_size += size; + } + ASSERT_EQ(total_size, 8192); + } + { + butil::IOReserveAlignedBuf buf(4096); + auto area = buf.reserve(4096 * 10 - 1); + ASSERT_NE(area, butil::IOBuf::INVALID_AREA); + butil::IOBufAsZeroCopyInputStream wrapper(buf); + const void* data; + int size; + int total_size = 0; + while (wrapper.Next(&data, &size)) { + ASSERT_EQ(reinterpret_cast<uintptr_t>(data) % 4096, 0); + ASSERT_EQ(size % 4096, 0); + total_size += size; + } + ASSERT_EQ(total_size, 4096 * 10); + } + { + butil::IOReserveAlignedBuf buf(4095); + auto area = buf.reserve(4096); + ASSERT_EQ(area, butil::IOBuf::INVALID_AREA); + } + { + butil::IOReserveAlignedBuf buf(8192); + auto area = buf.reserve(4096 * 10 + 1); + ASSERT_NE(area, butil::IOBuf::INVALID_AREA); + butil::IOBufAsZeroCopyInputStream wrapper(buf); + const void* data; + int size; + int total_size = 0; + while (wrapper.Next(&data, &size)) { + ASSERT_EQ(reinterpret_cast<uintptr_t>(data) % 4096, 0); + ASSERT_EQ(size % 4096, 0); + total_size += size; + } + ASSERT_EQ(total_size, 4096 * 10 + 8192); + } + { + butil::IOReserveAlignedBuf buf(4096); + auto area = buf.reserve(1024 * 1024 * 3); + ASSERT_NE(area, butil::IOBuf::INVALID_AREA); + butil::IOBufAsZeroCopyInputStream wrapper(buf); + const void* data; + int size; + int count = 0; + int total_size = 0; + std::stringstream ss; + while (wrapper.Next(&data, &size)) { + ASSERT_EQ(reinterpret_cast<uintptr_t>(data) % 4096, 0); + ASSERT_EQ(size % 4096, 0); + std::string str(size, 'A' + count++); + ss << str; + std::memcpy(const_cast<void*>(data), str.data(), str.size()); + total_size += size; + } + ASSERT_EQ(total_size, 3145728); + ASSERT_EQ(ss.str(), buf.to_string()); + } +} + } // namespace --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org