This is an automated email from the ASF dual-hosted git repository.
chenBright pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 4cd0a727 Add IOBuf<->std::iostream adapters for zero-copy I/O (#3341)
4cd0a727 is described below
commit 4cd0a727e272fa285e1ff5f82f661a5bada83d3c
Author: Bright Chen <[email protected]>
AuthorDate: Sun Jun 14 12:51:18 2026 +0800
Add IOBuf<->std::iostream adapters for zero-copy I/O (#3341)
Introduce four classes that bridge IOBuf and the standard iostream
hierarchy, enabling parsers and serializers that take std::istream& /
std::ostream& (e.g. nlohmann::json) to read from and write to IOBuf
directly without an intermediate string copy:
- IOBufAsInputStreamBuf : read-only streambuf over IOBuf blocks
- IOBufInputStream : std::istream view over IOBuf
- IOBufAsOutputStreamBuf : append-only streambuf, reusing
IOBufAsZeroCopyOutputStream's TLS block pool
- IOBufOutputStream : std::ostream view over IOBuf
---
MODULE.bazel | 1 +
WORKSPACE | 18 +++
src/butil/iobuf.cpp | 126 +++++++++++++++++++
src/butil/iobuf.h | 161 +++++++++++++++++++++++++
test/BUILD.bazel | 2 +
test/iobuf_unittest.cpp | 313 +++++++++++++++++++++++++++++++++++++++++++++++-
6 files changed, 618 insertions(+), 3 deletions(-)
diff --git a/MODULE.bazel b/MODULE.bazel
index bd5b8e38..b855cd50 100644
--- a/MODULE.bazel
+++ b/MODULE.bazel
@@ -44,6 +44,7 @@ bazel_dep(name = 'thrift', version = '0.21.0', repo_name =
'org_apache_thrift')
# test only
bazel_dep(name = "gperftools", version = "2.18.1", dev_dependency = True)
+bazel_dep(name = "nlohmann_json", version = "3.12.0", dev_dependency = True)
bazel_dep(name = 'googletest', version = '1.14.0.bcr.1', repo_name =
'com_google_googletest', dev_dependency = True)
bazel_dep(name = 'hedron_compile_commands', dev_dependency = True)
git_override(
diff --git a/WORKSPACE b/WORKSPACE
index a107f0a5..78a6c283 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -279,6 +279,24 @@ http_archive(
urls =
["https://archive.apache.org/dist/thrift/0.15.0/thrift-0.15.0.tar.gz"],
)
+# Header-only JSON library used by iobuf_unittest's IOBuf<->std::iostream
+# adapter tests. Keep version in sync with MODULE.bazel.
+http_archive(
+ name = "nlohmann_json",
+ build_file_content = """
+cc_library(
+ name = "json",
+ hdrs = ["single_include/nlohmann/json.hpp",
"single_include/nlohmann/json_fwd.hpp"],
+ strip_include_prefix = "single_include",
+ visibility = ["//visibility:public"],
+)
+""",
+ sha256 =
"4b92eb0c06d10683f7447ce9406cb97cd4b453be18d7279320f7b2f025c10187",
+ strip_prefix = "json-3.12.0",
+ urls =
["https://github.com/nlohmann/json/archive/refs/tags/v3.12.0.tar.gz"],
+)
+
+
#
# Perl Dependencies
#
diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp
index af77d968..01469e22 100644
--- a/src/butil/iobuf.cpp
+++ b/src/butil/iobuf.cpp
@@ -30,6 +30,8 @@
#include <fcntl.h> // O_RDONLY
#include <errno.h> // errno
#include <limits.h> // CHAR_BIT
+#include <algorithm> // std::min
+#include <limits> // std::numeric_limits
#include <stdexcept> // std::invalid_argument
#include <gflags/gflags.h> // gflags
#include "butil/build_config.h" // ARCH_CPU_X86_64
@@ -2025,6 +2027,130 @@ void IOBufAsZeroCopyOutputStream::_release_block() {
_cur_block = NULL;
}
+std::streambuf::int_type IOBufAsInputStreamBuf::underflow() {
+ size_t block_num = _buf.backing_block_num();
+ StringPiece blk;
+ while (_block_index < block_num) {
+ blk = _buf.backing_block(_block_index++);
+ if (!blk.empty()) {
+ break;
+ }
+ }
+ if (blk.empty()) {
+ return traits_type::eof();
+ }
+ // const_cast is safe here: setg() takes char* by API contract, but this
+ // streambuf never writes through it (no overflow/sputc path).
+ char* p = const_cast<char*>(blk.data());
+ setg(p, p, p + blk.size());
+ return traits_type::to_int_type(*gptr());
+}
+
+std::streamsize IOBufAsInputStreamBuf::xsgetn(char* s, std::streamsize n) {
+ auto kIntMax =
static_cast<std::streamsize>(std::numeric_limits<int>::max());
+ std::streamsize total = 0;
+ while (total < n) {
+ std::streamsize avail = egptr() - gptr();
+ if (avail == 0) {
+ if (underflow() == traits_type::eof()) {
+ break;
+ }
+ avail = egptr() - gptr();
+ }
+ // Cap step at INT_MAX so gbump(int) cannot overflow when a user-data
+ // block exceeds 2GB.
+ std::streamsize step =
+ std::min(std::min(avail, n - total), kIntMax);
+ iobuf::cp(s + total, gptr(), static_cast<size_t>(step));
+ gbump(static_cast<int>(step));
+ total += step;
+ }
+ return total;
+}
+
+std::streamsize IOBufAsInputStreamBuf::showmanyc() {
+ std::streamsize kMax = std::numeric_limits<std::streamsize>::max();
+ std::streamsize n = egptr() - gptr();
+ size_t block_num = _buf.backing_block_num();
+ for (size_t i = _block_index; i < block_num; ++i) {
+ const std::streamsize sz =
+ static_cast<std::streamsize>(_buf.backing_block(i).size());
+ // Saturate instead of overflowing on pathologically large IOBufs.
+ if (n > kMax - sz) {
+ return kMax;
+ }
+ n += sz;
+ }
+ return n;
+}
+
+IOBufAsOutputStreamBuf::~IOBufAsOutputStreamBuf() { shrink(); }
+
+void IOBufAsOutputStreamBuf::shrink() {
+ if (pbase() != NULL) {
+ std::streamsize unused = epptr() - pptr();
+ // _zc.BackUp takes int. A single put area never exceeds one block
+ // (Next() returns int size), so this fits in int by construction;
+ // the cap is purely defensive.
+ int kIntMax = std::numeric_limits<int>::max();
+ _zc.BackUp(unused > kIntMax ? kIntMax : static_cast<int>(unused));
+ setp(NULL, NULL);
+ }
+}
+
+std::streambuf::int_type IOBufAsOutputStreamBuf::overflow(int_type ch) {
+ if (traits_type::eq_int_type(ch, traits_type::eof())) {
+ return traits_type::not_eof(ch);
+ }
+ if (!refresh_put_area()) {
+ return traits_type::eof();
+ }
+ return sputc(traits_type::to_char_type(ch));
+}
+
+std::streamsize IOBufAsOutputStreamBuf::xsputn(
+ const char* s, std::streamsize n) {
+ auto kIntMax =
static_cast<std::streamsize>(std::numeric_limits<int>::max());
+ std::streamsize total = 0;
+ while (total < n) {
+ std::streamsize avail = epptr() - pptr();
+ if (avail == 0) {
+ if (!refresh_put_area()) {
+ break;
+ }
+ avail = epptr() - pptr();
+ if (avail == 0) {
+ break;
+ }
+ }
+ // Cap step at INT_MAX so pbump(int) cannot overflow when a dedicated
+ // block exceeds 2GB.
+ std::streamsize step =
+ std::min(std::min(avail, n - total), kIntMax);
+ iobuf::cp(pptr(), s + total, static_cast<size_t>(step));
+ pbump(static_cast<int>(step));
+ total += step;
+ }
+ return total;
+}
+
+int IOBufAsOutputStreamBuf::sync() {
+ shrink();
+ return 0;
+}
+
+bool IOBufAsOutputStreamBuf::refresh_put_area() {
+ void* block = NULL;
+ int size = 0;
+ if (!_zc.Next(&block, &size)) {
+ setp(NULL, NULL);
+ return false;
+ }
+ char* p = static_cast<char*>(block);
+ setp(p, p + size);
+ return true;
+}
+
IOBufAsSnappySink::IOBufAsSnappySink(butil::IOBuf& buf)
: _cur_buf(NULL), _cur_len(0), _buf(&buf), _buf_stream(&buf) {
}
diff --git a/src/butil/iobuf.h b/src/butil/iobuf.h
index 978aa34f..b92a2e3d 100644
--- a/src/butil/iobuf.h
+++ b/src/butil/iobuf.h
@@ -25,6 +25,8 @@
#include <sys/uio.h> // iovec
#include <stdint.h> // uint32_t, int64_t
#include <functional>
+#include <istream> // std::istream
+#include <streambuf> // std::streambuf
#include <string> // std::string
#include <ostream> // std::ostream
#include <google/protobuf/io/zero_copy_stream.h> // ZeroCopyInputStream
@@ -609,6 +611,165 @@ private:
int64_t _byte_count;
};
+// Wrap IOBuf into a std::streambuf for std::istream-based parsers
+// (e.g. nlohmann::json::parse(std::istream&)).
+//
+// Read-only view: the streambuf never writes to the source IOBuf. Forward-only
+// (seekoff/seekpos are not overridden).
+//
+// NOTE: The source IOBuf MUST NOT be modified during the lifetime of this
+// streambuf, otherwise the StringPieces returned by backing_block() may be
+// invalidated and the stream will read garbage or crash.
+class IOBufAsInputStreamBuf : public std::streambuf {
+public:
+ // `buf' must outlive this streambuf and must not be modified while the
+ // streambuf is in use.
+ explicit IOBufAsInputStreamBuf(const IOBuf& buf) : _buf(buf) {}
+
+protected:
+ int_type underflow() override;
+ std::streamsize xsgetn(char* s, std::streamsize n) override;
+ std::streamsize showmanyc() override;
+
+private:
+ const IOBuf& _buf;
+ size_t _block_index{0};
+};
+
+// std::istream view over an IOBuf. The IOBuf must outlive this stream and
+// must not be modified while the stream is in use. Forward-only — seeking
+// is not supported.
+//
+// Typical use is feeding an IOBuf to a parser that takes std::istream&,
+// e.g. nlohmann::json:
+//
+// butil::IOBufInputStream in(request_body);
+// auto j = nlohmann::json::parse(in);
+//
+// Or formatted extraction:
+//
+// butil::IOBuf buf;
+// buf.append("42 3.14 hello");
+// butil::IOBufInputStream in(buf);
+// int i; double d;
+// std::string s;
+// in >> i >> d >> s;
+//
+// Bulk read into a buffer (goes through xsgetn, copies one block at a time):
+//
+// std::string out(buf.length(), '\0');
+// butil::IOBufInputStream in(buf);
+// in.read(&out[0], out.size());
+class IOBufInputStream : public std::istream {
+public:
+ // `buf' must outlive this stream and must not be modified while the
+ // stream is in use.
+ explicit IOBufInputStream(const IOBuf& buf)
+ : std::istream(NULL), _sb(buf) {
+ rdbuf(&_sb);
+ }
+
+private:
+ IOBufAsInputStreamBuf _sb;
+};
+
+// Wrap IOBuf into a std::streambuf for std::ostream-based serializers
+// (e.g. nlohmann::json's `os << j`). Bytes are appended directly into IOBuf
+// blocks with no intermediate string copy.
+//
+// Internally backed by IOBufAsZeroCopyOutputStream:
+// - by default, blocks are taken from the per-thread TLS pool (8KB);
+// - pass `block_size` to allocate dedicated blocks instead, which avoids
+// fragmenting the TLS pool when many output streams coexist.
+//
+// Append-only — seekoff/seekpos are not overridden.
+//
+// IMPORTANT: The exact length of the source IOBuf only reflects what was
+// written AFTER shrink() / sync() / destruction — `Next()` over-claims the
+// remainder of each block and the unused tail is BackUp'd later. If you need
+// the precise length mid-stream, call sync() (e.g. via `os.flush()` or
+// `_sb.shrink()`).
+class IOBufAsOutputStreamBuf : public std::streambuf {
+public:
+ // `buf' must outlive this streambuf.
+ explicit IOBufAsOutputStreamBuf(IOBuf& buf) : _zc(&buf) {}
+ IOBufAsOutputStreamBuf(IOBuf& buf, uint32_t block_size)
+ : _zc(&buf, block_size) {}
+ ~IOBufAsOutputStreamBuf() override;
+
+ // Return the unused tail of the current put area to the underlying IOBuf
+ // so that the IOBuf length matches exactly what was written so far.
+ // Automatically invoked from sync() and the destructor.
+ void shrink();
+
+protected:
+ int_type overflow(int_type ch) override;
+ std::streamsize xsputn(const char* s, std::streamsize n) override;
+ int sync() override;
+
+private:
+ bool refresh_put_area();
+
+ IOBufAsZeroCopyOutputStream _zc;
+};
+
+// std::ostream view over an IOBuf. Appends written bytes to the referenced
+// IOBuf; the IOBuf must outlive this stream. Append-only — seeking is not
+// supported.
+//
+// The IOBuf's length only reflects bytes written AFTER flush()/destruction
+// (see IOBufAsOutputStreamBuf for details).
+//
+// Serialize with a library that writes to std::ostream&, e.g. nlohmann::json
+// (zero intermediate string copy — bytes flow straight into IOBuf blocks):
+//
+// #include <nlohmann/json.hpp>
+// using nlohmann::json;
+//
+// json reply = {
+// {"status", "ok"},
+// {"items", {1, 2, 3}},
+// };
+//
+// butil::IOBuf out; // e.g.
controller->response_attachment()
+// {
+// butil::IOBufOutputStream os(out);
+// os << reply; // compact:
{"items":[1,2,3],"status":"ok"}
+// // os << std::setw(2) << reply; // pretty-print with 2-space
indent
+// } // dtor runs shrink(); `out` now has the exact serialized bytes.
+//
+// Formatted insertion (works like any std::ostream):
+//
+// butil::IOBuf out;
+// butil::IOBufOutputStream os(out);
+// os << "x=" << 42 << " y=" << 3.14;
+// os.flush(); // commit to `out` now
+//
+// Bulk write of a known-size buffer (goes through xsputn, memcpy per block):
+//
+// butil::IOBufOutputStream os(out);
+// os.write(payload.data(), payload.size());
+//
+// Pass `block_size` when many output streams coexist in one thread, to keep
+// each stream's allocations off the shared TLS block pool:
+//
+// butil::IOBufOutputStream os(out, /*block_size=*/64 * 1024);
+class IOBufOutputStream : public std::ostream {
+public:
+ // `buf' must outlive this stream.
+ explicit IOBufOutputStream(IOBuf& buf)
+ : std::ostream(NULL), _sb(buf) {
+ rdbuf(&_sb);
+ }
+ IOBufOutputStream(IOBuf& buf, uint32_t block_size)
+ : std::ostream(NULL), _sb(buf, block_size) {
+ rdbuf(&_sb);
+ }
+
+private:
+ IOBufAsOutputStreamBuf _sb;
+};
+
// Wrap IOBuf into input of snappy compression.
class IOBufAsSnappySource : public butil::snappy::Source {
public:
diff --git a/test/BUILD.bazel b/test/BUILD.bazel
index 13ef5922..d442d3b7 100644
--- a/test/BUILD.bazel
+++ b/test/BUILD.bazel
@@ -179,12 +179,14 @@ cc_test(
"test_switches.h",
],
copts = COPTS,
+ defines = ["HAS_NLOHMANN_JSON=1"],
deps = [
":cc_test_proto",
":sstream_workaround",
":gperftools_helper",
"//:butil",
"//:bthread",
+ "@nlohmann_json//:json",
"@com_google_googletest//:gtest",
],
)
diff --git a/test/iobuf_unittest.cpp b/test/iobuf_unittest.cpp
index 489460e2..82112045 100644
--- a/test/iobuf_unittest.cpp
+++ b/test/iobuf_unittest.cpp
@@ -23,6 +23,9 @@
#include <stdlib.h>
#include <memory>
#include <cstring>
+#if HAS_NLOHMANN_JSON
+#include <nlohmann/json.hpp>
+#endif // HAS_NLOHMANN_JSON
#include <butil/files/temp_file.h> // TempFile
#include <butil/containers/flat_map.h>
#include <butil/macros.h>
@@ -52,8 +55,8 @@ extern void release_tls_block_chain(IOBuf::Block* b);
extern uint32_t block_cap(IOBuf::Block const* b);
extern uint32_t block_size(IOBuf::Block const* b);
extern IOBuf::Block* get_portal_next(IOBuf::Block const* b);
-}
-}
+} // namespace iobuf
+} // namespace butil
namespace {
@@ -1939,11 +1942,315 @@ TEST_F(IOBufTest, single_iobuf) {
ASSERT_EQ(null_buf, nullptr);
uint32_t old_size = sbuf3.get_length();
- void *p = sbuf3.reallocate_downward(old_size + 16, 0, old_size);
+ void *p = sbuf3.reallocate_downward(old_size + 16, 0, old_size);
ASSERT_TRUE(p != nullptr);
old_size = sbuf3.get_length();
p = sbuf3.reallocate_downward(old_size + 16, old_size, 0);
ASSERT_TRUE(p != nullptr);
}
+TEST_F(IOBufTest, as_input_stream_basic) {
+ butil::IOBuf buf;
+ buf.append("hello world");
+
+ butil::IOBufInputStream stream(buf);
+ std::string s;
+ stream >> s;
+ ASSERT_EQ("hello", s);
+ stream >> s;
+ ASSERT_EQ("world", s);
+ ASSERT_EQ(EOF, stream.peek());
+
+ // Stream construction must not mutate the source IOBuf.
+ ASSERT_EQ("hello world", buf.to_string());
+}
+
+TEST_F(IOBufTest, as_input_stream_empty) {
+ butil::IOBuf buf;
+ butil::IOBufInputStream stream(buf);
+ ASSERT_EQ(EOF, stream.peek());
+ char c;
+ ASSERT_FALSE(stream.get(c));
+ ASSERT_TRUE(stream.eof());
+}
+
+TEST_F(IOBufTest, as_input_stream_accepts_const_iobuf) {
+ butil::IOBuf buf;
+ buf.append("abc");
+ const butil::IOBuf& cbuf = buf;
+ butil::IOBufInputStream stream(cbuf);
+ char c;
+ ASSERT_TRUE(stream.get(c)); ASSERT_EQ('a', c);
+ ASSERT_TRUE(stream.get(c)); ASSERT_EQ('b', c);
+ ASSERT_TRUE(stream.get(c)); ASSERT_EQ('c', c);
+ ASSERT_EQ(EOF, stream.peek());
+}
+
+// Each call to append_user_data adds a separate BlockRef, giving us a
+// multi-block IOBuf that exercises underflow() across block boundaries.
+static void append_as_separate_blocks(butil::IOBuf* buf,
+ const std::string& payload,
+ size_t chunk) {
+ for (size_t i = 0; i < payload.size(); i += chunk) {
+ const size_t n = std::min(chunk, payload.size() - i);
+ char* p = static_cast<char*>(malloc(n));
+ memcpy(p, payload.data() + i, n);
+ ASSERT_EQ(0, buf->append_user_data(p, n, free));
+ }
+}
+
+TEST_F(IOBufTest, as_input_stream_multi_block_read) {
+ butil::IOBuf buf;
+ const std::string payload = "the quick brown fox jumps over the lazy dog";
+ append_as_separate_blocks(&buf, payload, 7);
+ ASSERT_GT(buf.backing_block_num(), 1u);
+
+ butil::IOBufInputStream stream(buf);
+ std::string got(payload.size(), '\0');
+ stream.read(&got[0], got.size());
+ ASSERT_EQ(static_cast<std::streamsize>(payload.size()), stream.gcount());
+ ASSERT_EQ(payload, got);
+ ASSERT_EQ(EOF, stream.peek());
+}
+
+TEST_F(IOBufTest, as_input_stream_large_payload) {
+ // Payload >> DEFAULT_BLOCK_SIZE (8192) forces multiple blocks even with
+ // a single append call.
+ std::string payload;
+ payload.reserve(100 * 1024);
+ for (int i = 0; i < 100 * 1024; ++i) {
+ payload.push_back(static_cast<char>('a' + (i % 26)));
+ }
+ butil::IOBuf buf;
+ buf.append(payload);
+ ASSERT_GT(buf.backing_block_num(), 1u);
+
+ butil::IOBufInputStream stream(buf);
+ std::string got(payload.size(), '\0');
+ stream.read(&got[0], got.size());
+ ASSERT_EQ(static_cast<std::streamsize>(payload.size()), stream.gcount());
+ ASSERT_EQ(payload, got);
+}
+
+TEST_F(IOBufTest, as_input_stream_get_matches_read) {
+ butil::IOBuf buf;
+ const std::string payload = "the quick brown fox jumps over the lazy dog";
+ append_as_separate_blocks(&buf, payload, 7);
+
+ // Byte-by-byte path (sbumpc).
+ butil::IOBufInputStream s1(buf);
+ std::string got1;
+ char c;
+ while (s1.get(c)) {
+ got1.push_back(c);
+ }
+ ASSERT_EQ(payload, got1);
+
+ // Bulk path (xsgetn).
+ butil::IOBufInputStream s2(buf);
+ std::string got2(payload.size(), '\0');
+ s2.read(&got2[0], got2.size());
+ ASSERT_EQ(static_cast<std::streamsize>(payload.size()), s2.gcount());
+ ASSERT_EQ(payload, got2);
+}
+
+TEST_F(IOBufTest, as_input_stream_short_read_at_eof) {
+ butil::IOBuf buf;
+ buf.append("abcd");
+ butil::IOBufInputStream stream(buf);
+
+ char got[8] = {};
+ stream.read(got, sizeof(got));
+ // istream sets failbit on short read at EOF, but gcount() reflects the
+ // actual number of bytes transferred.
+ ASSERT_EQ(4, stream.gcount());
+ ASSERT_EQ(0, memcmp(got, "abcd", 4));
+ ASSERT_TRUE(stream.eof());
+}
+
+TEST_F(IOBufTest, as_input_stream_in_avail) {
+ butil::IOBuf buf;
+ const std::string parts[] = {"aaa", "bbbb", "ccccc"};
+ size_t total = 0;
+ for (size_t i = 0; i < arraysize(parts); ++i) {
+ char* p = static_cast<char*>(malloc(parts[i].size()));
+ memcpy(p, parts[i].data(), parts[i].size());
+ ASSERT_EQ(0, buf.append_user_data(p, parts[i].size(), free));
+ total += parts[i].size();
+ }
+
+ butil::IOBufInputStream stream(buf);
+ // get area is empty, so in_avail() defers to showmanyc() which must sum
+ // all remaining backing blocks.
+ ASSERT_EQ(static_cast<std::streamsize>(total), stream.rdbuf()->in_avail());
+}
+
+TEST_F(IOBufTest, as_output_stream_basic) {
+ butil::IOBuf buf;
+ {
+ butil::IOBufOutputStream stream(buf);
+ stream << "hello " << 42 << ' ' << 3.5;
+ } // dtor calls shrink()
+ ASSERT_EQ("hello 42 3.5", buf.to_string());
+}
+
+TEST_F(IOBufTest, as_output_stream_appends_not_overwrites) {
+ butil::IOBuf buf;
+ buf.append("prefix:");
+ {
+ butil::IOBufOutputStream stream(buf);
+ stream << "payload";
+ }
+ ASSERT_EQ("prefix:payload", buf.to_string());
+}
+
+TEST_F(IOBufTest, as_output_stream_large_payload) {
+ // Cross multiple blocks (DEFAULT_BLOCK_SIZE == 8192).
+ std::string payload;
+ payload.reserve(100 * 1024);
+ for (int i = 0; i < 100 * 1024; ++i) {
+ payload.push_back(static_cast<char>('a' + (i % 26)));
+ }
+ butil::IOBuf buf;
+ {
+ butil::IOBufOutputStream stream(buf);
+ stream.write(payload.data(), payload.size());
+ ASSERT_TRUE(stream.good());
+ }
+ ASSERT_GT(buf.backing_block_num(), 1u);
+ ASSERT_EQ(payload, buf.to_string());
+}
+
+TEST_F(IOBufTest, as_output_stream_xsputn_matches_overflow) {
+ // Same payload, two write paths: bulk write() vs per-byte put().
+ const std::string payload = "the quick brown fox jumps over the lazy dog "
+ "0123456789 alpha beta gamma";
+ butil::IOBuf bulk_buf;
+ {
+ butil::IOBufOutputStream s(bulk_buf);
+ s.write(payload.data(), payload.size());
+ }
+ butil::IOBuf byte_buf;
+ {
+ butil::IOBufOutputStream s(byte_buf);
+ for (char c : payload) {
+ s.put(c);
+ }
+ }
+ ASSERT_EQ(payload, bulk_buf.to_string());
+ ASSERT_EQ(payload, byte_buf.to_string());
+}
+
+TEST_F(IOBufTest, as_output_stream_flush_shrinks_eagerly) {
+ // Without flush(), IOBuf::length() may exceed bytes-written because Next()
+ // over-claims the rest of the current block. flush() must reconcile it.
+ butil::IOBuf buf;
+ butil::IOBufOutputStream stream(buf);
+ stream << "abc";
+ stream.flush();
+ ASSERT_EQ(3u, buf.length());
+ ASSERT_EQ("abc", buf.to_string());
+
+ stream << "defg";
+ stream.flush();
+ ASSERT_EQ(7u, buf.length());
+ ASSERT_EQ("abcdefg", buf.to_string());
+}
+
+TEST_F(IOBufTest, as_output_stream_dedicated_block_size) {
+ // Passing block_size routes through create_block instead of TLS pool.
+ // Pick a small-but-valid block to force many allocations.
+ butil::IOBuf buf;
+ const std::string payload(4096, 'z');
+ {
+ butil::IOBufOutputStream stream(buf, /*block_size=*/256);
+ stream.write(payload.data(), payload.size());
+ }
+ ASSERT_EQ(payload, buf.to_string());
+ ASSERT_GT(buf.backing_block_num(), 1u);
+}
+
+TEST_F(IOBufTest, as_output_stream_round_trip_with_input_stream) {
+ // Write through OutputStream, read back through InputStream.
+ butil::IOBuf buf;
+ {
+ butil::IOBufOutputStream out(buf);
+ for (int i = 0; i < 1000; ++i) {
+ out << i << '\n';
+ }
+ }
+ butil::IOBufInputStream in(buf);
+ for (int i = 0; i < 1000; ++i) {
+ int v = -1;
+ in >> v;
+ ASSERT_EQ(i, v);
+ }
+}
+
+#if HAS_NLOHMANN_JSON
+// End-to-end test that the IOBuf <-> std::iostream adapters work with
+// nlohmann::json — the canonical "RPC handler reads JSON from an IOBuf body
+// and writes a JSON reply back to another IOBuf" flow.
+TEST_F(IOBufTest, as_stream_nlohmann_json_round_trip) {
+ // 1. Serialize a JSON object into an IOBuf via IOBufOutputStream.
+ nlohmann::json reply = {
+ {"status", "ok"},
+ {"code", 200},
+ {"items", {1, 2, 3, 4, 5}},
+ {"nested", {{"a", "alpha"}, {"b", "beta"}}},
+ };
+ butil::IOBuf out;
+ {
+ butil::IOBufOutputStream os(out);
+ os << reply;
+ } // dtor runs shrink(); `out` now holds exactly the serialized bytes.
+
+ ASSERT_EQ(reply.dump(), out.to_string());
+
+ // 2. Parse the IOBuf back through IOBufInputStream and verify roundtrip.
+ butil::IOBufInputStream in(out);
+ nlohmann::json parsed = nlohmann::json::parse(in);
+ ASSERT_EQ(reply, parsed);
+ ASSERT_EQ("ok", parsed["status"]);
+ ASSERT_EQ(200, parsed["code"]);
+ ASSERT_EQ(5u, parsed["items"].size());
+ ASSERT_EQ("alpha", parsed["nested"]["a"]);
+
+ // 3. Pretty-print via std::setw, then re-parse — verifies formatting flags
+ // propagate through IOBufAsOutputStreamBuf correctly.
+ butil::IOBuf pretty;
+ {
+ butil::IOBufOutputStream os(pretty);
+ os << std::setw(2) << reply;
+ }
+ ASSERT_EQ(reply.dump(2), pretty.to_string());
+ butil::IOBufInputStream pretty_in(pretty);
+ ASSERT_EQ(reply, nlohmann::json::parse(pretty_in));
+}
+
+TEST_F(IOBufTest, as_stream_nlohmann_json_large_array) {
+ // Build a payload large enough to span multiple IOBuf blocks
+ // (DEFAULT_BLOCK_SIZE == 8192) and exercise xsputn/xsgetn across
+ // block boundaries.
+ nlohmann::json arr = nlohmann::json::array();
+ for (int i = 0; i < 5000; ++i) {
+ arr.push_back({{"i", i}, {"sq", i * i}});
+ }
+
+ butil::IOBuf buf;
+ {
+ butil::IOBufOutputStream os(buf);
+ os << arr;
+ }
+ ASSERT_GT(buf.backing_block_num(), 1u) << "payload should span >1 block";
+ ASSERT_EQ(arr.dump(), buf.to_string());
+
+ butil::IOBufInputStream in(buf);
+ nlohmann::json parsed = nlohmann::json::parse(in);
+ ASSERT_EQ(arr, parsed);
+ ASSERT_EQ(5000u, parsed.size());
+ ASSERT_EQ(4999, parsed[4999]["i"]);
+ ASSERT_EQ(4999 * 4999, parsed[4999]["sq"]);
+}
+#endif // HAS_NLOHMANN_JSON
} // namespace
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]