This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 156b151301a [refine](Buffer) Rename and classify in BufferReadable and 
BufferWritable (#52735)
156b151301a is described below

commit 156b151301a95525093e0ef3f535d3dd01a2d98f
Author: Mryange <[email protected]>
AuthorDate: Thu Jul 10 08:48:07 2025 +0800

    [refine](Buffer) Rename and classify in BufferReadable and BufferWritable 
(#52735)
    
    ### What problem does this PR solve?
    
    ### Release note
    
    None
---
 be/src/util/counts.h                               |   4 +-
 .../aggregate_function_approx_count_distinct.h     |   4 +-
 .../aggregate_function_approx_top_k.h              |  24 ++--
 .../aggregate_function_approx_top_sum.h            |  24 ++--
 .../aggregate_function_array_agg.h                 |  24 ++--
 .../aggregate_functions/aggregate_function_avg.h   |   8 +-
 .../aggregate_function_avg_weighted.h              |   8 +-
 .../aggregate_functions/aggregate_function_bit.h   |   4 +-
 .../aggregate_function_collect.h                   |  36 ++---
 .../aggregate_functions/aggregate_function_corr.h  |  48 +++----
 .../aggregate_functions/aggregate_function_count.h |   8 +-
 .../aggregate_function_count_by_enum.h             |  28 ++--
 .../aggregate_functions/aggregate_function_covar.h |  16 +--
 .../aggregate_function_distinct.h                  |  16 +--
 .../aggregate_function_foreach.h                   |   4 +-
 .../aggregate_function_group_array_intersect.h     |  32 ++---
 .../aggregate_function_group_concat.h              |  12 +-
 .../aggregate_function_histogram.h                 |  16 +--
 .../aggregate_function_hll_union_agg.h             |   4 +-
 .../aggregate_function_java_udaf.h                 |   4 +-
 .../aggregate_function_linear_histogram.h          |  28 ++--
 .../aggregate_functions/aggregate_function_map.h   |  15 +-
 .../aggregate_function_map_v2.h                    |   8 +-
 .../aggregate_function_min_max.h                   |  28 ++--
 .../aggregate_function_min_max_by.h                |   4 +-
 .../aggregate_functions/aggregate_function_null.h  |   6 +-
 .../aggregate_function_orthogonal_bitmap.h         |  36 ++---
 .../aggregate_function_percentile.h                |  28 ++--
 .../aggregate_function_product.h                   |  12 +-
 .../aggregate_function_regr_union.h                |  20 +--
 .../aggregate_functions/aggregate_function_rpc.h   |   4 +-
 .../aggregate_function_sequence_match.h            |  28 ++--
 .../aggregate_functions/aggregate_function_sort.h  |   4 +-
 .../aggregate_function_stddev.h                    |  12 +-
 .../aggregate_functions/aggregate_function_sum.h   |   4 +-
 .../aggregate_functions/aggregate_function_topn.h  |  20 +--
 .../aggregate_functions/aggregate_function_uniq.h  |  12 +-
 .../aggregate_function_uniq_distribute_key.h       |   4 +-
 be/src/vec/aggregate_functions/moments.h           |   4 +-
 be/src/vec/common/hash_table/hash_table.h          |  10 +-
 be/src/vec/common/space_saving.h                   |  24 ++--
 be/src/vec/common/string_buffer.hpp                | 109 +++++++++++++++
 be/src/vec/data_types/data_type_bitmap.cpp         |   4 +-
 be/src/vec/data_types/data_type_hll.cpp            |   4 +-
 be/src/vec/data_types/data_type_quantilestate.cpp  |   4 +-
 be/src/vec/io/io_helper.h                          | 154 ---------------------
 be/src/vec/io/var_int.h                            |  43 +-----
 be/test/vec/common/string_buffer_test.cpp          |  73 ++++++++++
 be/test/vec/core/field_test.cpp                    |  14 +-
 49 files changed, 514 insertions(+), 526 deletions(-)

diff --git a/be/src/util/counts.h b/be/src/util/counts.h
index 43b815cf8ca..67880268da2 100644
--- a/be/src/util/counts.h
+++ b/be/src/util/counts.h
@@ -59,7 +59,7 @@ public:
         if (!_nums.empty()) {
             pdqsort(_nums.begin(), _nums.end());
             size_t size = _nums.size();
-            write_binary(size, buf);
+            buf.write_binary(size);
             buf.write(reinterpret_cast<const char*>(_nums.data()), sizeof(Ty) 
* size);
         } else {
             // convert _sorted_nums_vec to _nums and do seiralize again
@@ -70,7 +70,7 @@ public:
 
     void unserialize(vectorized::BufferReadable& buf) {
         size_t size;
-        read_binary(size, buf);
+        buf.read_binary(size);
         _nums.resize(size);
         auto buff = buf.read(sizeof(Ty) * size);
         memcpy(_nums.data(), buff.data, buff.size);
diff --git 
a/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.h 
b/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.h
index be67927c0eb..0efcaaad22b 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.h
@@ -63,12 +63,12 @@ struct AggregateFunctionApproxCountDistinctData {
         std::string result;
         result.resize(hll_data.max_serialized_size());
         result.resize(hll_data.serialize((uint8_t*)result.data()));
-        write_binary(result, buf);
+        buf.write_binary(result);
     }
 
     void read(BufferReadable& buf) {
         StringRef result;
-        read_binary(result, buf);
+        buf.read_binary(result);
         Slice data = Slice(result.data, result.size);
         hll_data.deserialize(data);
     }
diff --git a/be/src/vec/aggregate_functions/aggregate_function_approx_top_k.h 
b/be/src/vec/aggregate_functions/aggregate_function_approx_top_k.h
index 857df1ea3db..75adac6136a 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_approx_top_k.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_approx_top_k.h
@@ -73,12 +73,12 @@ public:
     void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& 
buf) const override {
         this->data(place).value.write(buf);
 
-        write_var_uint(_column_names.size(), buf);
+        buf.write_var_uint(_column_names.size());
         for (const auto& column_name : _column_names) {
-            write_string_binary(column_name, buf);
+            buf.write_binary(column_name);
         }
-        write_var_uint(_threshold, buf);
-        write_var_uint(_reserved, buf);
+        buf.write_var_uint(_threshold);
+        buf.write_var_uint(_reserved);
     }
 
     // Deserializes the aggregate function's state from a buffer (including 
the SpaceSaving structure and threshold).
@@ -86,7 +86,7 @@ public:
                      Arena* arena) const override {
         auto readStringBinaryInto = [](Arena& arena, BufferReadable& buf) {
             uint64_t size = 0;
-            read_var_uint(size, buf);
+            buf.read_var_uint(size);
 
             if (UNLIKELY(size > DEFAULT_MAX_STRING_SIZE)) {
                 throw Exception(ErrorCode::INTERNAL_ERROR, "Too large string 
size.");
@@ -102,7 +102,7 @@ public:
         set.clear();
 
         uint64_t size = 0;
-        read_var_uint(size, buf);
+        buf.read_var_uint(size);
         if (UNLIKELY(size > TOP_K_MAX_SIZE)) {
             throw Exception(ErrorCode::INTERNAL_ERROR,
                             "Too large size ({}) for aggregate function '{}' 
state (maximum is {})",
@@ -114,8 +114,8 @@ public:
             auto ref = readStringBinaryInto(*arena, buf);
             uint64_t count = 0;
             uint64_t error = 0;
-            read_var_uint(count, buf);
-            read_var_uint(error, buf);
+            buf.read_var_uint(count);
+            buf.read_var_uint(error);
             set.insert(ref, count, error);
             arena->rollback(ref.size);
         }
@@ -123,15 +123,15 @@ public:
         set.read_alpha_map(buf);
 
         uint64_t column_size = 0;
-        read_var_uint(column_size, buf);
+        buf.read_var_uint(column_size);
         _column_names.clear();
         for (uint64_t i = 0; i < column_size; i++) {
             std::string column_name;
-            read_string_binary(column_name, buf);
+            buf.read_binary(column_name);
             _column_names.emplace_back(std::move(column_name));
         }
-        read_var_uint(_threshold, buf);
-        read_var_uint(_reserved, buf);
+        buf.read_var_uint(_threshold);
+        buf.read_var_uint(_reserved);
     }
 
     // Adds a new row of data to the aggregate function (inserts a new value 
into the SpaceSaving structure).
diff --git a/be/src/vec/aggregate_functions/aggregate_function_approx_top_sum.h 
b/be/src/vec/aggregate_functions/aggregate_function_approx_top_sum.h
index 46df8dde22c..ced3ac6f760 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_approx_top_sum.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_approx_top_sum.h
@@ -76,12 +76,12 @@ public:
     void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& 
buf) const override {
         this->data(place).value.write(buf);
 
-        write_var_uint(_column_names.size(), buf);
+        buf.write_var_uint(_column_names.size());
         for (const auto& column_name : _column_names) {
-            write_string_binary(column_name, buf);
+            buf.write_binary(column_name);
         }
-        write_var_uint(_threshold, buf);
-        write_var_uint(_reserved, buf);
+        buf.write_var_uint(_threshold);
+        buf.write_var_uint(_reserved);
     }
 
     // Deserializes the aggregate function's state from a buffer (including 
the SpaceSaving structure and threshold).
@@ -89,7 +89,7 @@ public:
                      Arena* arena) const override {
         auto readStringBinaryInto = [](Arena& arena, BufferReadable& buf) {
             uint64_t size = 0;
-            read_var_uint(size, buf);
+            buf.read_var_uint(size);
 
             if (UNLIKELY(size > DEFAULT_MAX_STRING_SIZE)) {
                 throw Exception(ErrorCode::INTERNAL_ERROR, "Too large string 
size.");
@@ -105,7 +105,7 @@ public:
         set.clear();
 
         uint64_t size = 0;
-        read_var_uint(size, buf);
+        buf.read_var_uint(size);
         if (UNLIKELY(size > TOP_K_MAX_SIZE)) {
             throw Exception(ErrorCode::INTERNAL_ERROR,
                             "Too large size ({}) for aggregate function '{}' 
state (maximum is {})",
@@ -117,8 +117,8 @@ public:
             auto ref = readStringBinaryInto(*arena, buf);
             uint64_t count = 0;
             uint64_t error = 0;
-            read_var_uint(count, buf);
-            read_var_uint(error, buf);
+            buf.read_var_uint(count);
+            buf.read_var_uint(error);
             set.insert(ref, count, error);
             arena->rollback(ref.size);
         }
@@ -126,15 +126,15 @@ public:
         set.read_alpha_map(buf);
 
         uint64_t column_size = 0;
-        read_var_uint(column_size, buf);
+        buf.read_var_uint(column_size);
         _column_names.clear();
         for (uint64_t i = 0; i < column_size; i++) {
             std::string column_name;
-            read_string_binary(column_name, buf);
+            buf.read_binary(column_name);
             _column_names.emplace_back(std::move(column_name));
         }
-        read_var_uint(_threshold, buf);
-        read_var_uint(_reserved, buf);
+        buf.read_var_uint(_threshold);
+        buf.read_var_uint(_reserved);
     }
 
     // Adds a new row of data to the aggregate function (inserts a new value 
into the SpaceSaving structure).
diff --git a/be/src/vec/aggregate_functions/aggregate_function_array_agg.h 
b/be/src/vec/aggregate_functions/aggregate_function_array_agg.h
index 1005053c1b3..f4fa2dfbd53 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_array_agg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_array_agg.h
@@ -98,14 +98,14 @@ struct AggregateFunctionArrayAggData {
 
     void write(BufferWritable& buf) const {
         const size_t size = null_map->size();
-        write_binary(size, buf);
+        buf.write_binary(size);
 
         for (size_t i = 0; i < size; i++) {
-            write_binary(null_map->data()[i], buf);
+            buf.write_binary(null_map->data()[i]);
         }
 
         for (size_t i = 0; i < size; i++) {
-            write_binary(nested_column->get_data()[i], buf);
+            buf.write_binary(nested_column->get_data()[i]);
         }
     }
 
@@ -113,16 +113,16 @@ struct AggregateFunctionArrayAggData {
         DCHECK(null_map);
         DCHECK(null_map->empty());
         size_t size = 0;
-        read_binary(size, buf);
+        buf.read_binary(size);
         null_map->resize(size);
         nested_column->reserve(size);
         for (size_t i = 0; i < size; i++) {
-            read_binary(null_map->data()[i], buf);
+            buf.read_binary(null_map->data()[i]);
         }
 
         ElementType data_value;
         for (size_t i = 0; i < size; i++) {
-            read_binary(data_value, buf);
+            buf.read_binary(data_value);
             nested_column->get_data().push_back(data_value);
         }
     }
@@ -201,12 +201,12 @@ struct AggregateFunctionArrayAggData<T> {
 
     void write(BufferWritable& buf) const {
         const size_t size = null_map->size();
-        write_binary(size, buf);
+        buf.write_binary(size);
         for (size_t i = 0; i < size; i++) {
-            write_binary(null_map->data()[i], buf);
+            buf.write_binary(null_map->data()[i]);
         }
         for (size_t i = 0; i < size; i++) {
-            write_string_binary(nested_column->get_data_at(i), buf);
+            buf.write_binary(nested_column->get_data_at(i));
         }
     }
 
@@ -214,16 +214,16 @@ struct AggregateFunctionArrayAggData<T> {
         DCHECK(null_map);
         DCHECK(null_map->empty());
         size_t size = 0;
-        read_binary(size, buf);
+        buf.read_binary(size);
         null_map->resize(size);
         nested_column->reserve(size);
         for (size_t i = 0; i < size; i++) {
-            read_binary(null_map->data()[i], buf);
+            buf.read_binary(null_map->data()[i]);
         }
 
         StringRef s;
         for (size_t i = 0; i < size; i++) {
-            read_string_binary(s, buf);
+            buf.read_binary(s);
             nested_column->insert_data(s.data, s.size);
         }
     }
diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg.h 
b/be/src/vec/aggregate_functions/aggregate_function_avg.h
index 14b7e2bea27..1d7472eb358 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_avg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_avg.h
@@ -96,13 +96,13 @@ struct AggregateFunctionAvgData {
     }
 
     void write(BufferWritable& buf) const {
-        write_binary(sum, buf);
-        write_binary(count, buf);
+        buf.write_binary(sum);
+        buf.write_binary(count);
     }
 
     void read(BufferReadable& buf) {
-        read_binary(sum, buf);
-        read_binary(count, buf);
+        buf.read_binary(sum);
+        buf.read_binary(count);
     }
 };
 
diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg_weighted.h 
b/be/src/vec/aggregate_functions/aggregate_function_avg_weighted.h
index b1fa380ba53..0d702c0f811 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_avg_weighted.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_avg_weighted.h
@@ -60,13 +60,13 @@ struct AggregateFunctionAvgWeightedData {
     }
 
     void write(BufferWritable& buf) const {
-        write_binary(data_sum, buf);
-        write_binary(weight_sum, buf);
+        buf.write_binary(data_sum);
+        buf.write_binary(weight_sum);
     }
 
     void read(BufferReadable& buf) {
-        read_binary(data_sum, buf);
-        read_binary(weight_sum, buf);
+        buf.read_binary(data_sum);
+        buf.read_binary(weight_sum);
     }
 
     void merge(const AggregateFunctionAvgWeightedData& rhs) {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_bit.h 
b/be/src/vec/aggregate_functions/aggregate_function_bit.h
index 1798943a8ba..62819cd86df 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_bit.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_bit.h
@@ -45,8 +45,8 @@ struct AggregateFunctionBaseData {
 public:
     AggregateFunctionBaseData(typename PrimitiveTypeTraits<T>::CppType 
init_value)
             : res_bit(init_value) {}
-    void write(BufferWritable& buf) const { write_binary(res_bit, buf); }
-    void read(BufferReadable& buf) { read_binary(res_bit, buf); }
+    void write(BufferWritable& buf) const { buf.write_binary(res_bit); }
+    void read(BufferReadable& buf) { buf.read_binary(res_bit); }
     typename PrimitiveTypeTraits<T>::CppType get() const { return res_bit; }
 
 protected:
diff --git a/be/src/vec/aggregate_functions/aggregate_function_collect.h 
b/be/src/vec/aggregate_functions/aggregate_function_collect.h
index 4b48e6fe89d..50d841ef63e 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_collect.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_collect.h
@@ -83,19 +83,19 @@ struct AggregateFunctionCollectSetData {
     }
 
     void write(BufferWritable& buf) const {
-        write_var_uint(data_set.size(), buf);
+        buf.write_var_uint(data_set.size());
         for (const auto& value : data_set) {
-            write_binary(value, buf);
+            buf.write_binary(value);
         }
         write_var_int(max_size, buf);
     }
 
     void read(BufferReadable& buf) {
         uint64_t new_size = 0;
-        read_var_uint(new_size, buf);
+        buf.read_var_uint(new_size);
         ElementType x;
         for (size_t i = 0; i < new_size; ++i) {
-            read_binary(x, buf);
+            buf.read_binary(x);
             data_set.insert(x);
         }
         read_var_int(max_size, buf);
@@ -153,19 +153,19 @@ struct AggregateFunctionCollectSetData<T, HasLimit> {
     }
 
     void write(BufferWritable& buf) const {
-        write_var_uint(size(), buf);
+        buf.write_var_uint(size());
         for (const auto& elem : data_set) {
-            write_string_binary(elem, buf);
+            buf.write_binary(elem);
         }
         write_var_int(max_size, buf);
     }
 
     void read(BufferReadable& buf) {
         UInt64 size;
-        read_var_uint(size, buf);
+        buf.read_var_uint(size);
         StringRef ref;
         for (size_t i = 0; i < size; ++i) {
-            read_string_binary(ref, buf);
+            buf.read_binary(ref);
             data_set.insert(ref);
         }
         read_var_int(max_size, buf);
@@ -219,14 +219,14 @@ struct AggregateFunctionCollectListData {
     }
 
     void write(BufferWritable& buf) const {
-        write_var_uint(size(), buf);
+        buf.write_var_uint(size());
         buf.write(data.raw_data(), size() * sizeof(ElementType));
         write_var_int(max_size, buf);
     }
 
     void read(BufferReadable& buf) {
         UInt64 rows = 0;
-        read_var_uint(rows, buf);
+        buf.read_var_uint(rows);
         data.resize(rows);
         buf.read(reinterpret_cast<char*>(data.data()), rows * 
sizeof(ElementType));
         read_var_int(max_size, buf);
@@ -278,10 +278,10 @@ struct AggregateFunctionCollectListData<T, HasLimit> {
     void write(BufferWritable& buf) const {
         auto& col = assert_cast<ColVecType&>(*data);
 
-        write_var_uint(col.size(), buf);
+        buf.write_var_uint(col.size());
         buf.write(col.get_offsets().raw_data(), col.size() * 
sizeof(IColumn::Offset));
 
-        write_var_uint(col.get_chars().size(), buf);
+        buf.write_var_uint(col.get_chars().size());
         buf.write(col.get_chars().raw_data(), col.get_chars().size());
         write_var_int(max_size, buf);
     }
@@ -289,13 +289,13 @@ struct AggregateFunctionCollectListData<T, HasLimit> {
     void read(BufferReadable& buf) {
         auto& col = assert_cast<ColVecType&>(*data);
         UInt64 offs_size = 0;
-        read_var_uint(offs_size, buf);
+        buf.read_var_uint(offs_size);
         col.get_offsets().resize(offs_size);
         buf.read(reinterpret_cast<char*>(col.get_offsets().data()),
                  offs_size * sizeof(IColumn::Offset));
 
         UInt64 chars_size = 0;
-        read_var_uint(chars_size, buf);
+        buf.read_var_uint(chars_size);
         col.get_chars().resize(chars_size);
         buf.read(reinterpret_cast<char*>(col.get_chars().data()), chars_size);
         read_var_int(max_size, buf);
@@ -349,7 +349,7 @@ struct AggregateFunctionCollectListData<T, HasLimit> {
 
     void write(BufferWritable& buf) const {
         const size_t size = column_data->size();
-        write_binary(size, buf);
+        buf.write_binary(size);
 
         DataTypeSerDe::FormatOptions opt;
         auto tmp_str = ColumnString::create();
@@ -363,7 +363,7 @@ struct AggregateFunctionCollectListData<T, HasLimit> {
                                                " error: " + st.to_string());
             }
             tmp_buf.commit();
-            write_string_binary(tmp_str->get_data_at(0), buf);
+            buf.write_binary(tmp_str->get_data_at(0));
         }
 
         write_var_int(max_size, buf);
@@ -371,14 +371,14 @@ struct AggregateFunctionCollectListData<T, HasLimit> {
 
     void read(BufferReadable& buf) {
         size_t size = 0;
-        read_binary(size, buf);
+        buf.read_binary(size);
         column_data->clear();
         column_data->reserve(size);
 
         StringRef s;
         DataTypeSerDe::FormatOptions opt;
         for (size_t i = 0; i < size; i++) {
-            read_string_binary(s, buf);
+            buf.read_binary(s);
             Slice slice(s.data, s.size);
             if (Status st = 
serde->deserialize_one_cell_from_json(*column_data, slice, opt); !st) {
                 throw doris::Exception(ErrorCode::INTERNAL_ERROR,
diff --git a/be/src/vec/aggregate_functions/aggregate_function_corr.h 
b/be/src/vec/aggregate_functions/aggregate_function_corr.h
index c3f67ef2b5a..7e3a1591e99 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_corr.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_corr.h
@@ -49,21 +49,21 @@ struct CorrMoment {
     }
 
     void write(BufferWritable& buf) const {
-        write_binary(m0, buf);
-        write_binary(x1, buf);
-        write_binary(y1, buf);
-        write_binary(xy, buf);
-        write_binary(x2, buf);
-        write_binary(y2, buf);
+        buf.write_binary(m0);
+        buf.write_binary(x1);
+        buf.write_binary(y1);
+        buf.write_binary(xy);
+        buf.write_binary(x2);
+        buf.write_binary(y2);
     }
 
     void read(BufferReadable& buf) {
-        read_binary(m0, buf);
-        read_binary(x1, buf);
-        read_binary(y1, buf);
-        read_binary(xy, buf);
-        read_binary(x2, buf);
-        read_binary(y2, buf);
+        buf.read_binary(m0);
+        buf.read_binary(x1);
+        buf.read_binary(y1);
+        buf.read_binary(xy);
+        buf.read_binary(x2);
+        buf.read_binary(y2);
     }
 
     T get() const {
@@ -130,21 +130,21 @@ struct CorrMomentWelford {
     }
 
     void write(BufferWritable& buf) const {
-        write_binary(meanX, buf);
-        write_binary(meanY, buf);
-        write_binary(c2, buf);
-        write_binary(m2X, buf);
-        write_binary(m2Y, buf);
-        write_binary(count, buf);
+        buf.write_binary(meanX);
+        buf.write_binary(meanY);
+        buf.write_binary(c2);
+        buf.write_binary(m2X);
+        buf.write_binary(m2Y);
+        buf.write_binary(count);
     }
 
     void read(BufferReadable& buf) {
-        read_binary(meanX, buf);
-        read_binary(meanY, buf);
-        read_binary(c2, buf);
-        read_binary(m2X, buf);
-        read_binary(m2Y, buf);
-        read_binary(count, buf);
+        buf.read_binary(meanX);
+        buf.read_binary(meanY);
+        buf.read_binary(c2);
+        buf.read_binary(m2X);
+        buf.read_binary(m2Y);
+        buf.read_binary(count);
     }
 
     double get() const {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_count.h 
b/be/src/vec/aggregate_functions/aggregate_function_count.h
index 99e7422b108..57e4bacbdd0 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_count.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_count.h
@@ -74,12 +74,12 @@ public:
     }
 
     void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& 
buf) const override {
-        write_var_uint(data(place).count, buf);
+        buf.write_var_uint(data(place).count);
     }
 
     void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
                      Arena*) const override {
-        read_var_uint(data(place).count, buf);
+        buf.read_var_uint(data(place).count);
     }
 
     void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
@@ -221,12 +221,12 @@ public:
     }
 
     void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& 
buf) const override {
-        write_var_uint(data(place).count, buf);
+        buf.write_var_uint(data(place).count);
     }
 
     void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
                      Arena*) const override {
-        read_var_uint(data(place).count, buf);
+        buf.read_var_uint(data(place).count);
     }
 
     void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_count_by_enum.h 
b/be/src/vec/aggregate_functions/aggregate_function_count_by_enum.h
index 543ae55f872..27b71559f03 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_count_by_enum.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_count_by_enum.h
@@ -125,20 +125,20 @@ struct AggregateFunctionCountByEnumData {
     }
 
     void write(BufferWritable& buf) const {
-        write_binary(data_vec.size(), buf);
+        buf.write_binary(data_vec.size());
 
         for (const auto& data : data_vec) {
             const MapType& unordered_map = data.cbe;
-            write_binary(unordered_map.size(), buf);
+            buf.write_binary(unordered_map.size());
 
             for (const auto& [key, value] : unordered_map) {
-                write_binary(value, buf);
-                write_binary(key, buf);
+                buf.write_binary(value);
+                buf.write_binary(key);
             }
 
-            write_binary(data.not_null, buf);
-            write_binary(data.null, buf);
-            write_binary(data.all, buf);
+            buf.write_binary(data.not_null);
+            buf.write_binary(data.null);
+            buf.write_binary(data.all);
         }
     }
 
@@ -146,27 +146,27 @@ struct AggregateFunctionCountByEnumData {
         data_vec.clear();
 
         uint64_t vec_size_number = 0;
-        read_binary(vec_size_number, buf);
+        buf.read_binary(vec_size_number);
 
         for (int idx = 0; idx < vec_size_number; idx++) {
             uint64_t element_number = 0;
-            read_binary(element_number, buf);
+            buf.read_binary(element_number);
 
             MapType unordered_map;
             unordered_map.reserve(element_number);
             for (auto i = 0; i < element_number; i++) {
                 std::string key;
                 uint64_t value;
-                read_binary(value, buf);
-                read_binary(key, buf);
+                buf.read_binary(value);
+                buf.read_binary(key);
                 unordered_map.emplace(std::move(key), value);
             }
 
             CountByEnumData data;
             data.cbe = std::move(unordered_map);
-            read_binary(data.not_null, buf);
-            read_binary(data.null, buf);
-            read_binary(data.all, buf);
+            buf.read_binary(data.not_null);
+            buf.read_binary(data.null);
+            buf.read_binary(data.all);
             data_vec.emplace_back(std::move(data));
         }
     }
diff --git a/be/src/vec/aggregate_functions/aggregate_function_covar.h 
b/be/src/vec/aggregate_functions/aggregate_function_covar.h
index 81c4973ddfa..04bd3760374 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_covar.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_covar.h
@@ -51,17 +51,17 @@ struct BaseData {
     static DataTypePtr get_return_type() { return 
std::make_shared<DataTypeFloat64>(); }
 
     void write(BufferWritable& buf) const {
-        write_binary(sum_x, buf);
-        write_binary(sum_y, buf);
-        write_binary(sum_xy, buf);
-        write_binary(count, buf);
+        buf.write_binary(sum_x);
+        buf.write_binary(sum_y);
+        buf.write_binary(sum_xy);
+        buf.write_binary(count);
     }
 
     void read(BufferReadable& buf) {
-        read_binary(sum_x, buf);
-        read_binary(sum_y, buf);
-        read_binary(sum_xy, buf);
-        read_binary(count, buf);
+        buf.read_binary(sum_x);
+        buf.read_binary(sum_y);
+        buf.read_binary(sum_xy);
+        buf.read_binary(count);
     }
 
     void reset() {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_distinct.h 
b/be/src/vec/aggregate_functions/aggregate_function_distinct.h
index 616d8a1e9a9..c697f3be0f2 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_distinct.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_distinct.h
@@ -88,9 +88,9 @@ struct AggregateFunctionDistinctSingleNumericData {
     void serialize(BufferWritable& buf) const {
         DCHECK(!stable);
         if constexpr (!stable) {
-            write_var_uint(data.size(), buf);
+            buf.write_var_uint(data.size());
             for (const auto& value : data) {
-                write_binary(value, buf);
+                buf.write_binary(value);
             }
         }
     }
@@ -99,10 +99,10 @@ struct AggregateFunctionDistinctSingleNumericData {
         DCHECK(!stable);
         if constexpr (!stable) {
             uint64_t new_size = 0;
-            read_var_uint(new_size, buf);
+            buf.read_var_uint(new_size);
             typename PrimitiveTypeTraits<T>::CppType x;
             for (size_t i = 0; i < new_size; ++i) {
-                read_binary(x, buf);
+                buf.read_binary(x);
                 data.insert(x);
             }
         }
@@ -153,9 +153,9 @@ struct AggregateFunctionDistinctGenericData {
     void serialize(BufferWritable& buf) const {
         DCHECK(!stable);
         if constexpr (!stable) {
-            write_var_uint(data.size(), buf);
+            buf.write_var_uint(data.size());
             for (const auto& elem : data) {
-                write_string_binary(elem, buf);
+                buf.write_binary(elem);
             }
         }
     }
@@ -164,11 +164,11 @@ struct AggregateFunctionDistinctGenericData {
         DCHECK(!stable);
         if constexpr (!stable) {
             UInt64 size;
-            read_var_uint(size, buf);
+            buf.read_var_uint(size);
 
             StringRef ref;
             for (size_t i = 0; i < size; ++i) {
-                read_string_binary(ref, buf);
+                buf.read_binary(ref);
                 data.insert(ref);
             }
         }
diff --git a/be/src/vec/aggregate_functions/aggregate_function_foreach.h 
b/be/src/vec/aggregate_functions/aggregate_function_foreach.h
index 9a4801c43bd..35e9a7271cf 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_foreach.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_foreach.h
@@ -180,7 +180,7 @@ public:
 
     void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& 
buf) const override {
         const AggregateFunctionForEachData& state = data(place);
-        write_binary(state.dynamic_array_size, buf);
+        buf.write_binary(state.dynamic_array_size);
         const char* nested_state = state.array_of_aggregate_datas;
         for (size_t i = 0; i < state.dynamic_array_size; ++i) {
             nested_function->serialize(nested_state, buf);
@@ -193,7 +193,7 @@ public:
         AggregateFunctionForEachData& state = data(place);
 
         size_t new_size = 0;
-        read_binary(new_size, buf);
+        buf.read_binary(new_size);
 
         ensure_aggregate_data(place, new_size, *arena);
 
diff --git 
a/be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.h 
b/be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.h
index f0111e6443c..fac2c7d060f 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.h
@@ -222,15 +222,15 @@ public:
         auto& init = data.init;
         const bool is_set_contain_null = set->contain_null();
 
-        write_pod_binary(is_set_contain_null, buf);
-        write_pod_binary(init, buf);
-        write_var_uint(set->size(), buf);
+        buf.write_binary(is_set_contain_null);
+        buf.write_binary(init);
+        buf.write_var_uint(set->size());
         HybridSetBase::IteratorBase* it = set->begin();
 
         while (it->has_next()) {
             const typename PrimitiveTypeTraits<T>::CppType* value_ptr =
                     static_cast<const typename 
PrimitiveTypeTraits<T>::CppType*>(it->get_value());
-            write_int_binary((*value_ptr), buf);
+            buf.write_binary((*value_ptr));
             it->next();
         }
     }
@@ -240,15 +240,15 @@ public:
         auto& data = this->data(place);
         bool is_set_contain_null;
 
-        read_pod_binary(is_set_contain_null, buf);
+        buf.read_binary(is_set_contain_null);
         data.value->change_contain_null_value(is_set_contain_null);
-        read_pod_binary(data.init, buf);
+        buf.read_binary(data.init);
         UInt64 size;
-        read_var_uint(size, buf);
+        buf.read_var_uint(size);
 
         typename PrimitiveTypeTraits<T>::CppType element;
         for (UInt64 i = 0; i < size; ++i) {
-            read_int_binary(element, buf);
+            buf.read_binary(element);
             data.value->insert(static_cast<void*>(&element));
         }
     }
@@ -461,14 +461,14 @@ public:
         auto& init = data.init;
         const bool is_set_contain_null = set->contain_null();
 
-        write_pod_binary(is_set_contain_null, buf);
-        write_pod_binary(init, buf);
-        write_var_uint(set->size(), buf);
+        buf.write_binary(is_set_contain_null);
+        buf.write_binary(init);
+        buf.write_var_uint(set->size());
 
         HybridSetBase::IteratorBase* it = set->begin();
         while (it->has_next()) {
             const auto* value = reinterpret_cast<const 
StringRef*>(it->get_value());
-            write_string_binary(*value, buf);
+            buf.write_binary(*value);
             it->next();
         }
     }
@@ -478,15 +478,15 @@ public:
         auto& data = this->data(place);
         bool is_set_contain_null;
 
-        read_pod_binary(is_set_contain_null, buf);
+        buf.read_binary(is_set_contain_null);
         data.value->change_contain_null_value(is_set_contain_null);
-        read_pod_binary(data.init, buf);
+        buf.read_binary(data.init);
         UInt64 size;
-        read_var_uint(size, buf);
+        buf.read_var_uint(size);
 
         StringRef element;
         for (UInt64 i = 0; i < size; ++i) {
-            element = read_string_binary_into(*arena, buf);
+            element = read_binary_into(*arena, buf);
             data.value->insert((void*)element.data, element.size);
         }
     }
diff --git a/be/src/vec/aggregate_functions/aggregate_function_group_concat.h 
b/be/src/vec/aggregate_functions/aggregate_function_group_concat.h
index 5f598e411b2..98b6503e167 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_group_concat.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_group_concat.h
@@ -91,15 +91,15 @@ struct AggregateFunctionGroupConcatData {
     StringRef get() const { return StringRef {data.data(), data.size()}; }
 
     void write(BufferWritable& buf) const {
-        write_binary(StringRef {data.data(), data.size()}, buf);
-        write_binary(separator, buf);
-        write_binary(inited, buf);
+        buf.write_binary(data);
+        buf.write_binary(separator);
+        buf.write_binary(inited);
     }
 
     void read(BufferReadable& buf) {
-        read_binary(data, buf);
-        read_binary(separator, buf);
-        read_binary(inited, buf);
+        buf.read_binary(data);
+        buf.read_binary(separator);
+        buf.read_binary(inited);
     }
 
     void reset() {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_histogram.h 
b/be/src/vec/aggregate_functions/aggregate_function_histogram.h
index 2ca9856e7d9..a6b8a413063 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_histogram.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_histogram.h
@@ -94,30 +94,30 @@ struct AggregateFunctionHistogramData {
     }
 
     void write(BufferWritable& buf) const {
-        write_binary(max_num_buckets, buf);
+        buf.write_binary(max_num_buckets);
         auto element_number = (size_t)ordered_map.size();
-        write_binary(element_number, buf);
+        buf.write_binary(element_number);
 
         auto pair_vector = map_to_vector();
 
         for (auto i = 0; i < element_number; i++) {
             auto element = pair_vector[i];
-            write_binary(element.second, buf);
-            write_binary(element.first, buf);
+            buf.write_binary(element.second);
+            buf.write_binary(element.first);
         }
     }
 
     void read(BufferReadable& buf) {
-        read_binary(max_num_buckets, buf);
+        buf.read_binary(max_num_buckets);
 
         size_t element_number = 0;
-        read_binary(element_number, buf);
+        buf.read_binary(element_number);
 
         ordered_map.clear();
         std::pair<typename PrimitiveTypeTraits<T>::ColumnItemType, size_t> 
element;
         for (auto i = 0; i < element_number; i++) {
-            read_binary(element.first, buf);
-            read_binary(element.second, buf);
+            buf.read_binary(element.first);
+            buf.read_binary(element.second);
             ordered_map.insert(element);
         }
     }
diff --git a/be/src/vec/aggregate_functions/aggregate_function_hll_union_agg.h 
b/be/src/vec/aggregate_functions/aggregate_function_hll_union_agg.h
index ef4eae8d7bc..f7e2fda4161 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_hll_union_agg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_hll_union_agg.h
@@ -59,12 +59,12 @@ struct AggregateFunctionHLLData {
     void write(BufferWritable& buf) const {
         std::string result(dst_hll.max_serialized_size(), '0');
         result.resize(dst_hll.serialize((uint8_t*)result.c_str()));
-        write_binary(result, buf);
+        buf.write_binary(result);
     }
 
     void read(BufferReadable& buf) {
         StringRef ref;
-        read_binary(ref, buf);
+        buf.read_binary(ref);
         dst_hll.deserialize(Slice(ref.data, ref.size));
     }
 
diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h 
b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
index f8815c30d70..f4b23786586 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
@@ -170,7 +170,7 @@ public:
         int len = env->GetArrayLength(arr);
         serialize_data.resize(len);
         env->GetByteArrayRegion(arr, 0, len, 
reinterpret_cast<jbyte*>(serialize_data.data()));
-        write_binary(serialize_data, buf);
+        buf.write_binary(serialize_data);
         jbyte* pBytes = env->GetByteArrayElements(arr, nullptr);
         env->ReleaseByteArrayElements(arr, pBytes, JNI_ABORT);
         env->DeleteLocalRef(arr);
@@ -184,7 +184,7 @@ public:
         return JniUtil::GetJniExceptionMsg(env);
     }
 
-    void read(BufferReadable& buf) { read_binary(serialize_data, buf); }
+    void read(BufferReadable& buf) { buf.read_binary(serialize_data); }
 
     Status destroy() {
         JNIEnv* env = nullptr;
diff --git 
a/be/src/vec/aggregate_functions/aggregate_function_linear_histogram.h 
b/be/src/vec/aggregate_functions/aggregate_function_linear_histogram.h
index 51440687d3f..8d7b7e100c1 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_linear_histogram.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_linear_histogram.h
@@ -100,30 +100,30 @@ public:
 
     // write
     void write(BufferWritable& buf) const {
-        write_binary(offset, buf);
-        write_binary(interval, buf);
-        write_binary(lower, buf);
-        write_binary(upper, buf);
-        write_binary(buckets.size(), buf);
+        buf.write_binary(offset);
+        buf.write_binary(interval);
+        buf.write_binary(lower);
+        buf.write_binary(upper);
+        buf.write_binary(buckets.size());
         for (const auto& [key, count] : buckets) {
-            write_binary(key, buf);
-            write_binary(count, buf);
+            buf.write_binary(key);
+            buf.write_binary(count);
         }
     }
 
     // read
     void read(BufferReadable& buf) {
-        read_binary(offset, buf);
-        read_binary(interval, buf);
-        read_binary(lower, buf);
-        read_binary(upper, buf);
+        buf.read_binary(offset);
+        buf.read_binary(interval);
+        buf.read_binary(lower);
+        buf.read_binary(upper);
         size_t size;
-        read_binary(size, buf);
+        buf.read_binary(size);
         for (size_t i = 0; i < size; i++) {
             int32_t key;
             size_t count;
-            read_binary(key, buf);
-            read_binary(count, buf);
+            buf.read_binary(key);
+            buf.read_binary(count);
             buckets[key] = count;
         }
     }
diff --git a/be/src/vec/aggregate_functions/aggregate_function_map.h 
b/be/src/vec/aggregate_functions/aggregate_function_map.h
index d23615e0ebe..d8bc506ce5d 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_map.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_map.h
@@ -140,23 +140,22 @@ struct AggregateFunctionMapAggData {
 
     void write(BufferWritable& buf) const {
         const size_t size = _key_column->size();
-        write_binary(size, buf);
+        buf.write_binary(size);
         for (size_t i = 0; i < size; i++) {
-            write_binary(assert_cast<KeyColumnType&, 
TypeCheckOnRelease::DISABLE>(*_key_column)
-                                 .get_data_at(i),
-                         buf);
+            buf.write_binary(assert_cast<KeyColumnType&, 
TypeCheckOnRelease::DISABLE>(*_key_column)
+                                     .get_data_at(i));
         }
         for (size_t i = 0; i < size; i++) {
-            write_binary(_value_column->get_data_at(i), buf);
+            buf.write_binary(_value_column->get_data_at(i));
         }
     }
 
     void read(BufferReadable& buf) {
         size_t size = 0;
-        read_binary(size, buf);
+        buf.read_binary(size);
         StringRef key;
         for (size_t i = 0; i < size; i++) {
-            read_binary(key, buf);
+            buf.read_binary(key);
             DCHECK(_map.find(key) == _map.cend());
             key.data = _arena.insert(key.data, key.size);
             assert_cast<KeyColumnType&, 
TypeCheckOnRelease::DISABLE>(*_key_column)
@@ -164,7 +163,7 @@ struct AggregateFunctionMapAggData {
         }
         StringRef val;
         for (size_t i = 0; i < size; i++) {
-            read_binary(val, buf);
+            buf.read_binary(val);
             _value_column->insert_data(val.data, val.size);
         }
     }
diff --git a/be/src/vec/aggregate_functions/aggregate_function_map_v2.h 
b/be/src/vec/aggregate_functions/aggregate_function_map_v2.h
index 53290056fa5..620873cf38d 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_map_v2.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_map_v2.h
@@ -131,7 +131,7 @@ struct AggregateFunctionMapAggDataV2 {
         DCHECK_LE(written_bytes, serialized_bytes);
 
         serialized_buffer.resize(serialized_bytes);
-        write_string_binary(serialized_buffer, buf);
+        buf.write_binary(serialized_buffer);
 
         serialized_bytes =
                 _value_type->get_uncompressed_serialized_bytes(*_value_column, 
_be_version);
@@ -143,20 +143,20 @@ struct AggregateFunctionMapAggDataV2 {
         DCHECK_LE(written_bytes, serialized_bytes);
 
         serialized_buffer.resize(written_bytes);
-        write_string_binary(serialized_buffer, buf);
+        buf.write_binary(serialized_buffer);
     }
 
     void read(BufferReadable& buf) {
         std::string deserialized_buffer;
 
-        read_string_binary(deserialized_buffer, buf);
+        buf.read_binary(deserialized_buffer);
 
         const auto* ptr =
                 _key_type->deserialize(deserialized_buffer.data(), 
&_key_column, _be_version);
         auto read_bytes = ptr - deserialized_buffer.data();
         DCHECK_EQ(read_bytes, deserialized_buffer.size());
 
-        read_string_binary(deserialized_buffer, buf);
+        buf.read_binary(deserialized_buffer);
 
         ptr = _value_type->deserialize(deserialized_buffer.data(), 
&_value_column, _be_version);
         read_bytes = ptr - deserialized_buffer.data();
diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max.h 
b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
index fafbc2b36c0..02875e45d7e 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_min_max.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
@@ -109,16 +109,16 @@ public:
     }
 
     void write(BufferWritable& buf) const {
-        write_binary(has(), buf);
+        buf.write_binary(has());
         if (has()) {
-            write_binary(value, buf);
+            buf.write_binary(value);
         }
     }
 
     void read(BufferReadable& buf, Arena*) {
-        read_binary(has_value, buf);
+        buf.read_binary(has_value);
         if (has()) {
-            read_binary(value, buf);
+            buf.read_binary(value);
         }
     }
 
@@ -248,16 +248,16 @@ public:
     }
 
     void write(BufferWritable& buf) const {
-        write_binary(has(), buf);
+        buf.write_binary(has());
         if (has()) {
-            write_binary(value, buf);
+            buf.write_binary(value);
         }
     }
 
     void read(BufferReadable& buf, Arena*) {
-        read_binary(has_value, buf);
+        buf.read_binary(has_value);
         if (has()) {
-            read_binary(value, buf);
+            buf.read_binary(value);
         }
     }
 
@@ -384,7 +384,7 @@ public:
     }
 
     void write(BufferWritable& buf) const {
-        write_binary(size, buf);
+        buf.write_binary(size);
         if (has()) {
             buf.write(get_data(), size);
         }
@@ -392,7 +392,7 @@ public:
 
     void read(BufferReadable& buf, Arena*) {
         Int32 rhs_size;
-        read_binary(rhs_size, buf);
+        buf.read_binary(rhs_size);
 
         if (rhs_size >= 0) {
             if (rhs_size <= MAX_SMALL_STRING_SIZE) {
@@ -630,7 +630,7 @@ struct SingleValueDataComplexType {
     }
 
     void write(BufferWritable& buf) const {
-        write_binary(has(), buf);
+        buf.write_binary(has());
         if (!has()) {
             return;
         }
@@ -638,17 +638,17 @@ struct SingleValueDataComplexType {
                 column_type->get_uncompressed_serialized_bytes(*column_data, 
be_exec_version);
         std::string memory_buffer(size_bytes, '0');
         auto* p = column_type->serialize(*column_data, memory_buffer.data(), 
be_exec_version);
-        write_binary(memory_buffer, buf);
+        buf.write_binary(memory_buffer);
         DCHECK_EQ(p, memory_buffer.data() + size_bytes);
     }
 
     void read(BufferReadable& buf, Arena* arena) {
-        read_binary(has_value, buf);
+        buf.read_binary(has_value);
         if (!has()) {
             return;
         }
         std::string memory_buffer;
-        read_binary(memory_buffer, buf);
+        buf.read_binary(memory_buffer);
         const auto* p =
                 column_type->deserialize(memory_buffer.data(), &column_data, 
be_exec_version);
         DCHECK_EQ(p, memory_buffer.data() + memory_buffer.size());
diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max_by.h 
b/be/src/vec/aggregate_functions/aggregate_function_min_max_by.h
index 0adcaabd9c3..f76448465cc 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_min_max_by.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_min_max_by.h
@@ -59,14 +59,14 @@ public:
     }
 
     void write(BufferWritable& buf) const {
-        write_binary(has(), buf);
+        buf.write_binary(has());
         if (has()) {
             DataTypeBitMap::serialize_as_stream(value, buf);
         }
     }
 
     void read(BufferReadable& buf, Arena*) {
-        read_binary(has_value, buf);
+        buf.read_binary(has_value);
         if (has()) {
             DataTypeBitMap::deserialize_as_stream(value, buf);
         }
diff --git a/be/src/vec/aggregate_functions/aggregate_function_null.h 
b/be/src/vec/aggregate_functions/aggregate_function_null.h
index 274a39d8409..62c2b4ddba1 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_null.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_null.h
@@ -135,7 +135,7 @@ public:
     void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& 
buf) const override {
         bool flag = get_flag(place);
         if (result_is_nullable) {
-            write_binary(flag, buf);
+            buf.write_binary(flag);
         }
         if (flag) {
             nested_function->serialize(nested_place(place), buf);
@@ -146,7 +146,7 @@ public:
                      Arena* arena) const override {
         bool flag = true;
         if (result_is_nullable) {
-            read_binary(flag, buf);
+            buf.read_binary(flag);
         }
         if (flag) {
             set_flag(place);
@@ -158,7 +158,7 @@ public:
                                BufferReadable& buf, Arena* arena) const 
override {
         bool flag = true;
         if (result_is_nullable) {
-            read_binary(flag, buf);
+            buf.read_binary(flag);
         }
         if (flag) {
             set_flag(rhs);
diff --git 
a/be/src/vec/aggregate_functions/aggregate_function_orthogonal_bitmap.h 
b/be/src/vec/aggregate_functions/aggregate_function_orthogonal_bitmap.h
index b2dcca99a6a..1fc6f8a3985 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_orthogonal_bitmap.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_orthogonal_bitmap.h
@@ -122,13 +122,13 @@ public:
     }
 
     void write(BufferWritable& buf) {
-        write_binary(AggOrthBitmapBaseData<T>::first_init, buf);
+        buf.write_binary(AggOrthBitmapBaseData<T>::first_init);
         result = AggOrthBitmapBaseData<T>::bitmap.intersect();
         DataTypeBitMap::serialize_as_stream(result, buf);
     }
 
     void read(BufferReadable& buf) {
-        read_binary(AggOrthBitmapBaseData<T>::first_init, buf);
+        buf.read_binary(AggOrthBitmapBaseData<T>::first_init);
         DataTypeBitMap::deserialize_as_stream(result, buf);
     }
 
@@ -158,17 +158,17 @@ public:
     }
 
     void write(BufferWritable& buf) {
-        write_binary(AggOrthBitmapBaseData<T>::first_init, buf);
+        buf.write_binary(AggOrthBitmapBaseData<T>::first_init);
         std::string data;
         data.resize(AggOrthBitmapBaseData<T>::bitmap.size());
         AggOrthBitmapBaseData<T>::bitmap.serialize(data.data());
-        write_binary(data, buf);
+        buf.write_binary(data);
     }
 
     void read(BufferReadable& buf) {
-        read_binary(AggOrthBitmapBaseData<T>::first_init, buf);
+        buf.read_binary(AggOrthBitmapBaseData<T>::first_init);
         std::string data;
-        read_binary(data, buf);
+        buf.read_binary(data);
         AggOrthBitmapBaseData<T>::bitmap.deserialize(data.data());
     }
 
@@ -199,14 +199,14 @@ public:
     }
 
     void write(BufferWritable& buf) {
-        write_binary(AggOrthBitmapBaseData<T>::first_init, buf);
+        buf.write_binary(AggOrthBitmapBaseData<T>::first_init);
         result = AggOrthBitmapBaseData<T>::bitmap.intersect_count();
-        write_binary(result, buf);
+        buf.write_binary(result);
     }
 
     void read(BufferReadable& buf) {
-        read_binary(AggOrthBitmapBaseData<T>::first_init, buf);
-        read_binary(result, buf);
+        buf.read_binary(AggOrthBitmapBaseData<T>::first_init);
+        buf.read_binary(result);
     }
 
     void get(IColumn& to) const {
@@ -272,13 +272,13 @@ public:
     }
 
     void write(BufferWritable& buf) {
-        write_binary(AggOrthBitmapExprCalBaseData<T>::first_init, buf);
+        buf.write_binary(AggOrthBitmapExprCalBaseData<T>::first_init);
         result = 
AggOrthBitmapExprCalBaseData<T>::bitmap_expr_cal.bitmap_calculate();
         DataTypeBitMap::serialize_as_stream(result, buf);
     }
 
     void read(BufferReadable& buf) {
-        read_binary(AggOrthBitmapExprCalBaseData<T>::first_init, buf);
+        buf.read_binary(AggOrthBitmapExprCalBaseData<T>::first_init);
         DataTypeBitMap::deserialize_as_stream(result, buf);
     }
 
@@ -314,14 +314,14 @@ public:
     }
 
     void write(BufferWritable& buf) {
-        write_binary(AggOrthBitmapExprCalBaseData<T>::first_init, buf);
+        buf.write_binary(AggOrthBitmapExprCalBaseData<T>::first_init);
         result = 
AggOrthBitmapExprCalBaseData<T>::bitmap_expr_cal.bitmap_calculate_count();
-        write_binary(result, buf);
+        buf.write_binary(result);
     }
 
     void read(BufferReadable& buf) {
-        read_binary(AggOrthBitmapExprCalBaseData<T>::first_init, buf);
-        read_binary(result, buf);
+        buf.read_binary(AggOrthBitmapExprCalBaseData<T>::first_init);
+        buf.read_binary(result);
     }
 
     void get(IColumn& to) const {
@@ -357,10 +357,10 @@ struct OrthBitmapUnionCountData {
 
     void write(BufferWritable& buf) {
         result = value.cardinality();
-        write_binary(result, buf);
+        buf.write_binary(result);
     }
 
-    void read(BufferReadable& buf) { read_binary(result, buf); }
+    void read(BufferReadable& buf) { buf.read_binary(result); }
 
     void get(IColumn& to) const {
         auto& column = assert_cast<ColumnInt64&>(to);
diff --git a/be/src/vec/aggregate_functions/aggregate_function_percentile.h 
b/be/src/vec/aggregate_functions/aggregate_function_percentile.h
index 986ba277e17..eb62f1cb655 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_percentile.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_percentile.h
@@ -82,31 +82,31 @@ struct PercentileApproxState {
     }
 
     void write(BufferWritable& buf) const {
-        write_binary(init_flag, buf);
+        buf.write_binary(init_flag);
         if (!init_flag) {
             return;
         }
 
-        write_binary(target_quantile, buf);
-        write_binary(compressions, buf);
+        buf.write_binary(target_quantile);
+        buf.write_binary(compressions);
         uint32_t serialize_size = digest->serialized_size();
         std::string result(serialize_size, '0');
         DCHECK(digest.get() != nullptr);
         digest->serialize((uint8_t*)result.c_str());
 
-        write_binary(result, buf);
+        buf.write_binary(result);
     }
 
     void read(BufferReadable& buf) {
-        read_binary(init_flag, buf);
+        buf.read_binary(init_flag);
         if (!init_flag) {
             return;
         }
 
-        read_binary(target_quantile, buf);
-        read_binary(compressions, buf);
+        buf.read_binary(target_quantile);
+        buf.read_binary(compressions);
         std::string str;
-        read_binary(str, buf);
+        buf.read_binary(str);
         digest = TDigest::create_unique(compressions);
         digest->unserialize((uint8_t*)str.c_str());
     }
@@ -323,14 +323,14 @@ struct PercentileState {
     bool inited_flag = false;
 
     void write(BufferWritable& buf) const {
-        write_binary(inited_flag, buf);
+        buf.write_binary(inited_flag);
         if (!inited_flag) {
             return;
         }
         int size_num = vec_quantile.size();
-        write_binary(size_num, buf);
+        buf.write_binary(size_num);
         for (const auto& quantile : vec_quantile) {
-            write_binary(quantile, buf);
+            buf.write_binary(quantile);
         }
         for (auto& counts : vec_counts) {
             counts.serialize(buf);
@@ -338,16 +338,16 @@ struct PercentileState {
     }
 
     void read(BufferReadable& buf) {
-        read_binary(inited_flag, buf);
+        buf.read_binary(inited_flag);
         if (!inited_flag) {
             return;
         }
         int size_num = 0;
-        read_binary(size_num, buf);
+        buf.read_binary(size_num);
         double data = 0.0;
         vec_quantile.clear();
         for (int i = 0; i < size_num; ++i) {
-            read_binary(data, buf);
+            buf.read_binary(data);
             vec_quantile.emplace_back(data);
         }
         vec_counts.clear();
diff --git a/be/src/vec/aggregate_functions/aggregate_function_product.h 
b/be/src/vec/aggregate_functions/aggregate_function_product.h
index d9e6e6888a8..633d290d791 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_product.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_product.h
@@ -48,9 +48,9 @@ struct AggregateFunctionProductData {
         product *= other.product;
     }
 
-    void write(BufferWritable& buffer) const { write_binary(product, buffer); }
+    void write(BufferWritable& buffer) const { buffer.write_binary(product); }
 
-    void read(BufferReadable& buffer) { read_binary(product, buffer); }
+    void read(BufferReadable& buffer) { buffer.read_binary(product); }
 
     typename PrimitiveTypeTraits<T>::ColumnItemType get() const { return 
product; }
 
@@ -77,9 +77,9 @@ struct AggregateFunctionProductData<TYPE_DECIMALV2> {
         memcpy(&product, &ret, sizeof(Decimal128V2));
     }
 
-    void write(BufferWritable& buffer) const { write_binary(product, buffer); }
+    void write(BufferWritable& buffer) const { buffer.write_binary(product); }
 
-    void read(BufferReadable& buffer) { read_binary(product, buffer); }
+    void read(BufferReadable& buffer) { buffer.read_binary(product); }
 
     Decimal128V2 get() const { return product; }
 
@@ -104,9 +104,9 @@ struct AggregateFunctionProductData<T> {
         product /= multiplier;
     }
 
-    void write(BufferWritable& buffer) const { write_binary(product, buffer); }
+    void write(BufferWritable& buffer) const { buffer.write_binary(product); }
 
-    void read(BufferReadable& buffer) { read_binary(product, buffer); }
+    void read(BufferReadable& buffer) { buffer.read_binary(product); }
 
     typename PrimitiveTypeTraits<T>::ColumnItemType get() const { return 
product; }
 
diff --git a/be/src/vec/aggregate_functions/aggregate_function_regr_union.h 
b/be/src/vec/aggregate_functions/aggregate_function_regr_union.h
index ab28951de1b..7ac90f0e90b 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_regr_union.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_regr_union.h
@@ -48,19 +48,19 @@ struct AggregateFunctionRegrData {
     Float64 sum_of_x_squared {};
 
     void write(BufferWritable& buf) const {
-        write_binary(sum_x, buf);
-        write_binary(sum_y, buf);
-        write_binary(sum_of_x_mul_y, buf);
-        write_binary(sum_of_x_squared, buf);
-        write_binary(count, buf);
+        buf.write_binary(sum_x);
+        buf.write_binary(sum_y);
+        buf.write_binary(sum_of_x_mul_y);
+        buf.write_binary(sum_of_x_squared);
+        buf.write_binary(count);
     }
 
     void read(BufferReadable& buf) {
-        read_binary(sum_x, buf);
-        read_binary(sum_y, buf);
-        read_binary(sum_of_x_mul_y, buf);
-        read_binary(sum_of_x_squared, buf);
-        read_binary(count, buf);
+        buf.read_binary(sum_x);
+        buf.read_binary(sum_y);
+        buf.read_binary(sum_of_x_mul_y);
+        buf.read_binary(sum_of_x_squared);
+        buf.read_binary(count);
     }
 
     void reset() {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_rpc.h 
b/be/src/vec/aggregate_functions/aggregate_function_rpc.h
index 27837577801..14a826de0da 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_rpc.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_rpc.h
@@ -246,13 +246,13 @@ public:
         } else {
             LOG(ERROR) << "serialize empty buf";
         }
-        write_binary(serialize_data, buf);
+        buf.write_binary(serialize_data);
     }
 
     void deserialize(BufferReadable& buf) {
         static_cast<void>(send_buffer_to_rpc_server());
         std::string serialize_data;
-        read_binary(serialize_data, buf);
+        buf.read_binary(serialize_data);
         if (error_default_str != serialize_data) {
             _res.ParseFromString(serialize_data);
             set_last_result(true);
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sequence_match.h 
b/be/src/vec/aggregate_functions/aggregate_function_sequence_match.h
index 6d4f5c39b9d..adabef0c27d 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sequence_match.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sequence_match.h
@@ -137,47 +137,47 @@ public:
     }
 
     void write(BufferWritable& buf) const {
-        write_binary(sorted, buf);
-        write_binary(events_list.size(), buf);
+        buf.write_binary(sorted);
+        buf.write_binary(events_list.size());
 
         for (const auto& events : events_list) {
-            write_binary(events.first, buf);
-            write_binary(events.second.to_ulong(), buf);
+            buf.write_binary(events.first);
+            buf.write_binary(events.second.to_ulong());
         }
 
         // This is std::bitset<32>, which will not exceed 32 bits.
         UInt32 conditions_met_value = (UInt32)conditions_met.to_ulong();
-        write_binary(conditions_met_value, buf);
+        buf.write_binary(conditions_met_value);
 
-        write_binary(pattern, buf);
-        write_binary(arg_count, buf);
+        buf.write_binary(pattern);
+        buf.write_binary(arg_count);
     }
 
     void read(BufferReadable& buf) {
-        read_binary(sorted, buf);
+        buf.read_binary(sorted);
 
         size_t events_list_size;
-        read_binary(events_list_size, buf);
+        buf.read_binary(events_list_size);
 
         events_list.clear();
         events_list.reserve(events_list_size);
 
         for (size_t i = 0; i < events_list_size; ++i) {
             Timestamp timestamp;
-            read_binary(timestamp, buf);
+            buf.read_binary(timestamp);
 
             UInt64 events;
-            read_binary(events, buf);
+            buf.read_binary(events);
 
             events_list.emplace_back(timestamp, Events {events});
         }
 
         UInt32 conditions_met_value;
-        read_binary(conditions_met_value, buf);
+        buf.read_binary(conditions_met_value);
         conditions_met = conditions_met_value;
 
-        read_binary(pattern, buf);
-        read_binary(arg_count, buf);
+        buf.read_binary(pattern);
+        buf.read_binary(arg_count);
     }
 
 private:
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.h 
b/be/src/vec/aggregate_functions/aggregate_function_sort.h
index 84cc83cd24b..f574689cf26 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sort.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sort.h
@@ -82,12 +82,12 @@ struct AggregateFunctionSortData {
             throw doris::Exception(st);
         }
 
-        write_string_binary(pblock.SerializeAsString(), buf);
+        buf.write_binary(pblock.SerializeAsString());
     }
 
     void deserialize(BufferReadable& buf) {
         std::string data;
-        read_binary(data, buf);
+        buf.read_binary(data);
 
         PBlock pblock;
         pblock.ParseFromString(data);
diff --git a/be/src/vec/aggregate_functions/aggregate_function_stddev.h 
b/be/src/vec/aggregate_functions/aggregate_function_stddev.h
index 53ab6dcae7d..b04f52c44ca 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_stddev.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_stddev.h
@@ -49,15 +49,15 @@ struct BaseData {
     virtual ~BaseData() = default;
 
     void write(BufferWritable& buf) const {
-        write_binary(mean, buf);
-        write_binary(m2, buf);
-        write_binary(count, buf);
+        buf.write_binary(mean);
+        buf.write_binary(m2);
+        buf.write_binary(count);
     }
 
     void read(BufferReadable& buf) {
-        read_binary(mean, buf);
-        read_binary(m2, buf);
-        read_binary(count, buf);
+        buf.read_binary(mean);
+        buf.read_binary(m2);
+        buf.read_binary(count);
     }
 
     void reset() {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sum.h 
b/be/src/vec/aggregate_functions/aggregate_function_sum.h
index 8e4d647b20c..b35cc215c13 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sum.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sum.h
@@ -61,9 +61,9 @@ struct AggregateFunctionSumData {
 
     void merge(const AggregateFunctionSumData& rhs) { sum += rhs.sum; }
 
-    void write(BufferWritable& buf) const { write_binary(sum, buf); }
+    void write(BufferWritable& buf) const { buf.write_binary(sum); }
 
-    void read(BufferReadable& buf) { read_binary(sum, buf); }
+    void read(BufferReadable& buf) { buf.read_binary(sum); }
 
     typename PrimitiveTypeTraits<T>::ColumnItemType get() const { return sum; }
 };
diff --git a/be/src/vec/aggregate_functions/aggregate_function_topn.h 
b/be/src/vec/aggregate_functions/aggregate_function_topn.h
index edddb129f06..01982a3e9b4 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_topn.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_topn.h
@@ -137,33 +137,33 @@ struct AggregateFunctionTopNData {
     }
 
     void write(BufferWritable& buf) const {
-        write_binary(top_num, buf);
-        write_binary(capacity, buf);
+        buf.write_binary(top_num);
+        buf.write_binary(capacity);
 
         uint64_t element_number = std::min(capacity, 
(uint64_t)counter_map.size());
-        write_binary(element_number, buf);
+        buf.write_binary(element_number);
 
         auto counter_vector = get_remain_vector();
 
         for (auto i = 0; i < element_number; i++) {
             auto element = counter_vector[i];
-            write_binary(element.second, buf);
-            write_binary(element.first, buf);
+            buf.write_binary(element.second);
+            buf.write_binary(element.first);
         }
     }
 
     void read(BufferReadable& buf) {
-        read_binary(top_num, buf);
-        read_binary(capacity, buf);
+        buf.read_binary(top_num);
+        buf.read_binary(capacity);
 
         uint64_t element_number = 0;
-        read_binary(element_number, buf);
+        buf.read_binary(element_number);
 
         counter_map.clear();
         std::pair<DataType, uint64_t> element;
         for (auto i = 0; i < element_number; i++) {
-            read_binary(element.first, buf);
-            read_binary(element.second, buf);
+            buf.read_binary(element.first);
+            buf.read_binary(element.second);
             counter_map.insert(element);
         }
     }
diff --git a/be/src/vec/aggregate_functions/aggregate_function_uniq.h 
b/be/src/vec/aggregate_functions/aggregate_function_uniq.h
index b68a202b965..76104e8cdd5 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_uniq.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_uniq.h
@@ -202,9 +202,9 @@ public:
 
     void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& 
buf) const override {
         auto& set = this->data(place).set;
-        write_var_uint(set.size(), buf);
+        buf.write_var_uint(set.size());
         for (const auto& elem : set) {
-            write_pod_binary(elem, buf);
+            buf.write_binary(elem);
         }
     }
 
@@ -212,13 +212,13 @@ public:
                                BufferReadable& buf, Arena*) const override {
         auto& set = this->data(place).set;
         UInt64 size;
-        read_var_uint(size, buf);
+        buf.read_var_uint(size);
 
         set.rehash(size + set.size());
 
         for (size_t i = 0; i < size; ++i) {
             KeyType ref;
-            read_pod_binary(ref, buf);
+            buf.read_binary(ref);
             set.insert(ref);
         }
     }
@@ -227,13 +227,13 @@ public:
                      Arena*) const override {
         auto& set = this->data(place).set;
         UInt64 size;
-        read_var_uint(size, buf);
+        buf.read_var_uint(size);
 
         set.rehash(size + set.size());
 
         for (size_t i = 0; i < size; ++i) {
             KeyType ref;
-            read_pod_binary(ref, buf);
+            buf.read_binary(ref);
             set.insert(ref);
         }
     }
diff --git 
a/be/src/vec/aggregate_functions/aggregate_function_uniq_distribute_key.h 
b/be/src/vec/aggregate_functions/aggregate_function_uniq_distribute_key.h
index 16feddaae0f..376db6ff768 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_uniq_distribute_key.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_uniq_distribute_key.h
@@ -153,12 +153,12 @@ public:
     }
 
     void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& 
buf) const override {
-        write_var_uint(this->data(place).set.size(), buf);
+        buf.write_var_uint(this->data(place).set.size());
     }
 
     void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
                      Arena*) const override {
-        read_var_uint(this->data(place).count, buf);
+        buf.read_var_uint(this->data(place).count);
     }
 
     void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
diff --git a/be/src/vec/aggregate_functions/moments.h 
b/be/src/vec/aggregate_functions/moments.h
index 0628b3116e2..d4614f4f060 100644
--- a/be/src/vec/aggregate_functions/moments.h
+++ b/be/src/vec/aggregate_functions/moments.h
@@ -53,9 +53,9 @@ struct VarMoments {
         if constexpr (_level >= 4) m[4] += rhs.m[4];
     }
 
-    void write(BufferWritable& buf) const { write_binary(*this, buf); }
+    void write(BufferWritable& buf) const { buf.write_binary(*this); }
 
-    void read(BufferReadable& buf) { read_binary(*this, buf); }
+    void read(BufferReadable& buf) { buf.read_binary(*this); }
 
     T get() const {
         throw doris::Exception(ErrorCode::INTERNAL_ERROR,
diff --git a/be/src/vec/common/hash_table/hash_table.h 
b/be/src/vec/common/hash_table/hash_table.h
index 2330b18fbdf..e00986704d7 100644
--- a/be/src/vec/common/hash_table/hash_table.h
+++ b/be/src/vec/common/hash_table/hash_table.h
@@ -196,12 +196,10 @@ struct HashTableCell {
     void set_mapped(const value_type& /*value*/) {}
 
     /// Serialization, in binary and text form.
-    void write(doris::vectorized::BufferWritable& wb) const {
-        doris::vectorized::write_binary(key, wb);
-    }
+    void write(doris::vectorized::BufferWritable& wb) const { 
wb.write_binary(key); }
 
     /// Deserialization, in binary and text form.
-    void read(doris::vectorized::BufferReadable& rb) { 
doris::vectorized::read_binary(key, rb); }
+    void read(doris::vectorized::BufferReadable& rb) { rb.read_binary(key); }
 };
 
 template <typename Key, typename Hash, typename State>
@@ -948,7 +946,7 @@ public:
 
     void write(doris::vectorized::BufferWritable& wb) const {
         Cell::State::write(wb);
-        doris::vectorized::write_var_uint(m_size, wb);
+        wb.write_var_uint(m_size);
 
         if (this->get_has_zero()) this->zero_value()->write(wb);
 
@@ -964,7 +962,7 @@ public:
         m_size = 0;
 
         doris::vectorized::UInt64 new_size = 0;
-        doris::vectorized::read_var_uint(new_size, rb);
+        rb.read_var_uint(new_size);
 
         free();
         Grower new_grower = grower;
diff --git a/be/src/vec/common/space_saving.h b/be/src/vec/common/space_saving.h
index 4a05ab2b409..85d5d085216 100644
--- a/be/src/vec/common/space_saving.h
+++ b/be/src/vec/common/space_saving.h
@@ -88,15 +88,15 @@ public:
                 : key(k), hash(h), count(c), error(e) {}
 
         void write(BufferWritable& wb) const {
-            write_binary(key, wb);
-            write_var_uint(count, wb);
-            write_var_uint(error, wb);
+            wb.write_binary(key);
+            wb.write_var_uint(count);
+            wb.write_var_uint(error);
         }
 
         void read(BufferReadable& rb) {
-            read_binary(key, rb);
-            read_var_uint(count, rb);
-            read_var_uint(error, rb);
+            rb.read_binary(key);
+            rb.read_var_uint(count);
+            rb.read_var_uint(error);
         }
 
         bool operator>(const Counter& b) const {
@@ -231,21 +231,21 @@ public:
     }
 
     void write(BufferWritable& wb) const {
-        write_var_uint(size(), wb);
+        wb.write_var_uint(size());
         for (auto& counter : counter_list) {
             counter->write(wb);
         }
 
-        write_var_uint(alpha_map.size(), wb);
+        wb.write_var_uint(alpha_map.size());
         for (auto alpha : alpha_map) {
-            write_var_uint(alpha, wb);
+            wb.write_var_uint(alpha);
         }
     }
 
     void read(BufferReadable& rb) {
         destroy_elements();
         uint64_t count = 0;
-        read_var_uint(count, rb);
+        rb.read_var_uint(count);
 
         for (UInt64 i = 0; i < count; ++i) {
             std::unique_ptr counter = std::make_unique<Counter>();
@@ -260,10 +260,10 @@ public:
     // Reads the alpha map data from the provided readable buffer.
     void read_alpha_map(BufferReadable& rb) {
         uint64_t alpha_size = 0;
-        read_var_uint(alpha_size, rb);
+        rb.read_var_uint(alpha_size);
         for (size_t i = 0; i < alpha_size; ++i) {
             uint64_t alpha = 0;
-            read_var_uint(alpha, rb);
+            rb.read_var_uint(alpha);
             alpha_map.push_back(alpha);
         }
     }
diff --git a/be/src/vec/common/string_buffer.hpp 
b/be/src/vec/common/string_buffer.hpp
index 980f70b2dc7..5cd981cc16d 100644
--- a/be/src/vec/common/string_buffer.hpp
+++ b/be/src/vec/common/string_buffer.hpp
@@ -24,6 +24,8 @@
 #include "vec/common/string_ref.h"
 
 namespace doris::vectorized {
+static constexpr size_t DEFAULT_MAX_STRING_SIZE = 1073741824; // 1GB
+static constexpr size_t DEFAULT_MAX_JSON_SIZE = 1073741824;   // 1GB
 
 // store and commit data. only after commit the data is effective on its' 
base(ColumnString)
 // everytime commit, the _data add one row.
@@ -57,6 +59,46 @@ public:
         write(buffer.data(), buffer.size());
     }
 
+    // Write a variable-length unsigned integer to the buffer
+    // maybe it's better not to use this
+    void write_var_uint(UInt64 x) {
+        char bytes[9];
+        uint8_t i = 0;
+        while (i < 9) {
+            uint8_t byte = x & 0x7F;
+            if (x > 0x7F) {
+                byte |= 0x80;
+            }
+
+            bytes[i++] = byte;
+
+            x >>= 7;
+            if (!x) {
+                break;
+            }
+        }
+        write((char*)&i, 1);
+        write(bytes, i);
+    }
+
+    template <typename Type>
+    void write_binary(const Type& x) {
+        static_assert(std::is_standard_layout_v<Type>);
+        write(reinterpret_cast<const char*>(&x), sizeof(x));
+    }
+
+    template <typename Type>
+        requires(std::is_same_v<Type, String> || std::is_same_v<Type, 
PaddedPODArray<UInt8>>)
+    void write_binary(const Type& s) {
+        write_var_uint(s.size());
+        write(reinterpret_cast<const char*>(s.data()), s.size());
+    }
+
+    void write_binary(const StringRef& s) {
+        write_var_uint(s.size);
+        write(s.data, s.size);
+    }
+
 private:
     ColumnString::Chars& _data;
     ColumnString::Offsets& _offsets;
@@ -84,6 +126,62 @@ public:
         _data += len;
     }
 
+    void read_var_uint(UInt64& x) {
+        x = 0;
+        // get length from first byte firstly
+        uint8_t len = 0;
+        read((char*)&len, 1);
+        auto ref = read(len);
+        // read data and set it to x per byte.
+        char* bytes = const_cast<char*>(ref.data);
+        for (size_t i = 0; i < 9; ++i) {
+            UInt64 byte = bytes[i];
+            x |= (byte & 0x7F) << (7 * i);
+
+            if (!(byte & 0x80)) {
+                return;
+            }
+        }
+    }
+
+    template <typename Type>
+    void read_binary(Type& x) {
+        static_assert(std::is_standard_layout_v<Type>);
+        read(reinterpret_cast<char*>(&x), sizeof(x));
+    }
+
+    template <typename Type>
+        requires(std::is_same_v<Type, String> || std::is_same_v<Type, 
PaddedPODArray<UInt8>>)
+    void read_binary(Type& s) {
+        UInt64 size = 0;
+        read_var_uint(size);
+
+        if (size > DEFAULT_MAX_STRING_SIZE) {
+            throw doris::Exception(ErrorCode::INTERNAL_ERROR,
+                                   "Too large string size."
+                                   " size: {}, max: {}",
+                                   size, DEFAULT_MAX_STRING_SIZE);
+        }
+
+        s.resize(size);
+        read((char*)s.data(), size);
+    }
+
+    // Note that the StringRef in this function is just a reference, it should 
be copied outside
+    void read_binary(StringRef& s) {
+        UInt64 size = 0;
+        read_var_uint(size);
+
+        if (size > DEFAULT_MAX_STRING_SIZE) {
+            throw doris::Exception(ErrorCode::INTERNAL_ERROR,
+                                   "Too large string size. "
+                                   " size: {}, max: {}",
+                                   size, DEFAULT_MAX_STRING_SIZE);
+        }
+
+        s = read(size);
+    }
+
 private:
     const char* _data;
 };
@@ -91,4 +189,15 @@ private:
 using VectorBufferReader = BufferReadable;
 using BufferReader = BufferReadable;
 
+///TODO: Currently this function is only called in one place, we might need to 
convert all read_binary(StringRef) to this style? Or directly use 
read_binary(String)
+inline StringRef read_binary_into(Arena& arena, BufferReadable& buf) {
+    UInt64 size = 0;
+    buf.read_var_uint(size);
+
+    char* data = arena.alloc(size);
+    buf.read(data, size);
+
+    return {data, size};
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/data_types/data_type_bitmap.cpp 
b/be/src/vec/data_types/data_type_bitmap.cpp
index da7b537c0d3..fc13190e033 100644
--- a/be/src/vec/data_types/data_type_bitmap.cpp
+++ b/be/src/vec/data_types/data_type_bitmap.cpp
@@ -162,12 +162,12 @@ void DataTypeBitMap::serialize_as_stream(const 
BitmapValue& cvalue, BufferWritab
     size_t bytesize = value.getSizeInBytes();
     memory_buffer.resize(bytesize);
     value.write_to(const_cast<char*>(memory_buffer.data()));
-    write_string_binary(memory_buffer, buf);
+    buf.write_binary(memory_buffer);
 }
 
 void DataTypeBitMap::deserialize_as_stream(BitmapValue& value, BufferReadable& 
buf) {
     StringRef ref;
-    read_string_binary(ref, buf);
+    buf.read_binary(ref);
     value.deserialize(ref.data);
 }
 
diff --git a/be/src/vec/data_types/data_type_hll.cpp 
b/be/src/vec/data_types/data_type_hll.cpp
index e543a42c697..6dcd8386e32 100644
--- a/be/src/vec/data_types/data_type_hll.cpp
+++ b/be/src/vec/data_types/data_type_hll.cpp
@@ -180,12 +180,12 @@ void DataTypeHLL::serialize_as_stream(const HyperLogLog& 
cvalue, BufferWritable&
     std::string memory_buffer(value.max_serialized_size(), '0');
     size_t actual_size = value.serialize((uint8_t*)memory_buffer.data());
     memory_buffer.resize(actual_size);
-    write_string_binary(memory_buffer, buf);
+    buf.write_binary(memory_buffer);
 }
 
 void DataTypeHLL::deserialize_as_stream(HyperLogLog& value, BufferReadable& 
buf) {
     std::string str;
-    read_string_binary(str, buf);
+    buf.read_binary(str);
     value.deserialize(Slice(str));
 }
 
diff --git a/be/src/vec/data_types/data_type_quantilestate.cpp 
b/be/src/vec/data_types/data_type_quantilestate.cpp
index ec4d05b819e..a73f0de9b17 100644
--- a/be/src/vec/data_types/data_type_quantilestate.cpp
+++ b/be/src/vec/data_types/data_type_quantilestate.cpp
@@ -163,12 +163,12 @@ void DataTypeQuantileState::serialize_as_stream(const 
QuantileState& cvalue, Buf
     std::string memory_buffer;
     memory_buffer.resize(value.get_serialized_size());
     
value.serialize(const_cast<uint8_t*>(reinterpret_cast<uint8_t*>(memory_buffer.data())));
-    write_string_binary(memory_buffer, buf);
+    buf.write_binary(memory_buffer);
 }
 
 void DataTypeQuantileState::deserialize_as_stream(QuantileState& value, 
BufferReadable& buf) {
     StringRef ref;
-    read_string_binary(ref, buf);
+    buf.read_binary(ref);
     value.deserialize(ref.to_slice());
 }
 
diff --git a/be/src/vec/io/io_helper.h b/be/src/vec/io/io_helper.h
index 8943c2f4fc1..8782a155b2e 100644
--- a/be/src/vec/io/io_helper.h
+++ b/be/src/vec/io/io_helper.h
@@ -41,12 +41,6 @@
 
 namespace doris::vectorized {
 
-// Define in the namespace and avoid defining global macros,
-// because it maybe conflicts with other libs
-static constexpr size_t DEFAULT_MAX_STRING_SIZE = 1073741824; // 1GB
-static constexpr size_t DEFAULT_MAX_JSON_SIZE = 1073741824;   // 1GB
-static constexpr auto WRITE_HELPERS_MAX_INT_WIDTH = 40U;
-
 inline std::string int128_to_string(int128_t value) {
     return fmt::format(FMT_COMPILE("{}"), value);
 }
@@ -96,154 +90,6 @@ void write_text(Decimal<T> value, UInt32 scale, 
std::ostream& ostr) {
         ostr.write(str_fractional.data(), scale);
     }
 }
-/// Methods for output in binary format.
-
-/// Write POD-type in native format. It's recommended to use only with packed 
(dense) data types.
-template <typename Type>
-void write_pod_binary(const Type& x, BufferWritable& buf) {
-    buf.write(reinterpret_cast<const char*>(&x), sizeof(x));
-}
-
-template <typename Type>
-void write_int_binary(const Type& x, BufferWritable& buf) {
-    write_pod_binary(x, buf);
-}
-
-template <typename Type>
-void write_float_binary(const Type& x, BufferWritable& buf) {
-    write_pod_binary(x, buf);
-}
-
-inline void write_string_binary(const std::string& s, BufferWritable& buf) {
-    write_var_uint(s.size(), buf);
-    buf.write(s.data(), s.size());
-}
-
-inline void write_string_binary(const StringRef& s, BufferWritable& buf) {
-    write_var_uint(s.size, buf);
-    buf.write(s.data, s.size);
-}
-
-inline void write_string_binary(const char* s, BufferWritable& buf) {
-    write_string_binary(StringRef {std::string(s)}, buf);
-}
-
-inline void write_json_binary(const JsonbField& s, BufferWritable& buf) {
-    write_string_binary(StringRef {s.get_value(), s.get_size()}, buf);
-}
-
-template <typename Type>
-void write_vector_binary(const std::vector<Type>& v, BufferWritable& buf) {
-    write_var_uint(v.size(), buf);
-
-    for (typename std::vector<Type>::const_iterator it = v.begin(); it != 
v.end(); ++it) {
-        write_binary(*it, buf);
-    }
-}
-
-inline void write_binary(const String& x, BufferWritable& buf) {
-    write_string_binary(x, buf);
-}
-
-inline void write_binary(const StringRef& x, BufferWritable& buf) {
-    write_string_binary(x, buf);
-}
-
-template <typename Type>
-void write_binary(const Type& x, BufferWritable& buf) {
-    write_pod_binary(x, buf);
-}
-
-/// Read POD-type in native format
-template <typename Type>
-void read_pod_binary(Type& x, BufferReadable& buf) {
-    buf.read(reinterpret_cast<char*>(&x), sizeof(x));
-}
-
-template <typename Type>
-void read_int_binary(Type& x, BufferReadable& buf) {
-    read_pod_binary(x, buf);
-}
-
-template <typename Type>
-void read_float_binary(Type& x, BufferReadable& buf) {
-    read_pod_binary(x, buf);
-}
-
-template <typename Type>
-    requires(std::is_same_v<Type, String> || std::is_same_v<Type, 
PaddedPODArray<UInt8>>)
-inline void read_string_binary(Type& s, BufferReadable& buf,
-                               size_t MAX_STRING_SIZE = 
DEFAULT_MAX_STRING_SIZE) {
-    UInt64 size = 0;
-    read_var_uint(size, buf);
-
-    if (size > MAX_STRING_SIZE) {
-        throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Too large string 
size.");
-    }
-
-    s.resize(size);
-    buf.read((char*)s.data(), size);
-}
-
-inline void read_string_binary(StringRef& s, BufferReadable& buf,
-                               size_t MAX_STRING_SIZE = 
DEFAULT_MAX_STRING_SIZE) {
-    UInt64 size = 0;
-    read_var_uint(size, buf);
-
-    if (size > MAX_STRING_SIZE) {
-        throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Too large string 
size.");
-    }
-
-    s = buf.read(size);
-}
-
-inline StringRef read_string_binary_into(Arena& arena, BufferReadable& buf) {
-    UInt64 size = 0;
-    read_var_uint(size, buf);
-
-    char* data = arena.alloc(size);
-    buf.read(data, size);
-
-    return {data, size};
-}
-
-inline void read_json_binary(JsonbField& val, BufferReadable& buf,
-                             size_t MAX_JSON_SIZE = DEFAULT_MAX_JSON_SIZE) {
-    StringRef result;
-    read_string_binary(result, buf);
-    val = JsonbField(result.data, result.size);
-}
-
-template <typename Type>
-void read_vector_binary(std::vector<Type>& v, BufferReadable& buf,
-                        size_t MAX_VECTOR_SIZE = DEFAULT_MAX_STRING_SIZE) {
-    UInt64 size = 0;
-    read_var_uint(size, buf);
-
-    if (size > MAX_VECTOR_SIZE) {
-        throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Too large vector 
size.");
-    }
-
-    v.resize(size);
-    for (size_t i = 0; i < size; ++i) {
-        read_binary(v[i], buf);
-    }
-}
-
-template <typename Type>
-    requires(std::is_same_v<Type, String> || std::is_same_v<Type, 
PaddedPODArray<UInt8>>)
-inline void read_binary(Type& x, BufferReadable& buf) {
-    read_string_binary(x, buf);
-}
-
-inline void read_binary(StringRef& x, BufferReadable& buf) {
-    read_string_binary(x, buf);
-}
-
-template <typename Type>
-void read_binary(Type& x, BufferReadable& buf) {
-    read_pod_binary(x, buf);
-}
 
 template <typename T>
 bool read_float_text_fast_impl(T& x, ReadBuffer& in) {
diff --git a/be/src/vec/io/var_int.h b/be/src/vec/io/var_int.h
index 561e7f59edb..44e07d8e1dd 100644
--- a/be/src/vec/io/var_int.h
+++ b/be/src/vec/io/var_int.h
@@ -24,53 +24,14 @@ namespace doris::vectorized {
 /** Write Int64 in variable length format (base128) */
 template <typename OUT>
 void write_var_int(Int64 x, OUT& ostr) {
-    write_var_uint(static_cast<UInt64>((x << 1) ^ (x >> 63)), ostr);
+    ostr.write_var_uint(static_cast<UInt64>((x << 1) ^ (x >> 63)));
 }
 
 /** Read Int64, written in variable length format (base128) */
 template <typename IN>
 void read_var_int(Int64& x, IN& istr) {
-    read_var_uint(*reinterpret_cast<UInt64*>(&x), istr);
+    istr.read_var_uint(*reinterpret_cast<UInt64*>(&x));
     x = (static_cast<UInt64>(x) >> 1) ^ -(x & 1);
 }
 
-// TODO: do real implement in the future
-inline void read_var_uint(UInt64& x, BufferReadable& buf) {
-    x = 0;
-    // get length from first byte firstly
-    uint8_t len = 0;
-    buf.read((char*)&len, 1);
-    auto ref = buf.read(len);
-    // read data and set it to x per byte.
-    char* bytes = const_cast<char*>(ref.data);
-    for (size_t i = 0; i < 9; ++i) {
-        UInt64 byte = bytes[i];
-        x |= (byte & 0x7F) << (7 * i);
-
-        if (!(byte & 0x80)) {
-            return;
-        }
-    }
-}
-
-inline void write_var_uint(UInt64 x, BufferWritable& ostr) {
-    char bytes[9];
-    uint8_t i = 0;
-    while (i < 9) {
-        uint8_t byte = x & 0x7F;
-        if (x > 0x7F) {
-            byte |= 0x80;
-        }
-
-        bytes[i++] = byte;
-
-        x >>= 7;
-        if (!x) {
-            break;
-        }
-    }
-    ostr.write((char*)&i, 1);
-    ostr.write(bytes, i);
-}
-
 } // namespace doris::vectorized
diff --git a/be/test/vec/common/string_buffer_test.cpp 
b/be/test/vec/common/string_buffer_test.cpp
new file mode 100644
index 00000000000..4c80725b8ad
--- /dev/null
+++ b/be/test/vec/common/string_buffer_test.cpp
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/common/string_buffer.hpp"
+
+#include <gtest/gtest.h>
+
+#include "runtime/primitive_type.h"
+
+namespace doris::vectorized {
+
+TEST(StringBufferTest, TestWriteNumber) {
+    ColumnString column_string;
+    BufferWritable buffer(column_string);
+    buffer.write_number(12345);
+    buffer.write_number(true);
+    buffer.write_number(3.14159);
+    buffer.commit();
+
+    EXPECT_EQ(column_string.size(), 1);
+    auto str_ref = column_string.get_data_at(0);
+
+    EXPECT_EQ(str_ref.to_string(), "12345true3.14159");
+}
+
+TEST(StringBufferTest, TestWriteBinary) {
+    ColumnString column_string;
+    BufferWritable buffer(column_string);
+
+    {
+        String str = "Hello, World!";
+        buffer.write_binary(str);
+    }
+    {
+        int64_t x = 123456789;
+        buffer.write_binary(x);
+    }
+
+    buffer.commit();
+
+    EXPECT_EQ(column_string.size(), 1);
+    auto str_ref = column_string.get_data_at(0);
+
+    BufferReadable readable(str_ref);
+
+    {
+        String read_str;
+        readable.read_binary(read_str);
+        EXPECT_EQ(read_str, "Hello, World!");
+    }
+
+    {
+        int64_t read_x;
+        readable.read_binary(read_x);
+        EXPECT_EQ(read_x, 123456789);
+    }
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/core/field_test.cpp b/be/test/vec/core/field_test.cpp
index 9576f2963e7..bdb3eb7a3fe 100644
--- a/be/test/vec/core/field_test.cpp
+++ b/be/test/vec/core/field_test.cpp
@@ -123,7 +123,7 @@ TEST(VFieldTest, jsonb_field_io) {
     // Write the JsonbField to the buffer
     {
         BufferWritable buf(column_str);
-        write_json_binary(original, buf);
+        buf.write_binary(StringRef {original.get_value(), 
original.get_size()});
         buf.commit(); // Important: commit the write operation
     }
 
@@ -139,8 +139,9 @@ TEST(VFieldTest, jsonb_field_io) {
         BufferReadable read_buf(str_ref);
 
         // Read the data back into a new JsonbField
-        JsonbField read_field;
-        read_json_binary(read_field, read_buf);
+        StringRef result;
+        read_buf.read_binary(result);
+        JsonbField read_field = JsonbField(result.data, result.size);
 
         // Verify the data
         ASSERT_NE(read_field.get_value(), nullptr);
@@ -156,7 +157,7 @@ TEST(VFieldTest, jsonb_field_io) {
         // ser
         {
             BufferWritable field_buf(field_column);
-            write_json_binary(original, field_buf);
+            field_buf.write_binary(StringRef {original.get_value(), 
original.get_size()});
             field_buf.commit();
         }
 
@@ -169,8 +170,9 @@ TEST(VFieldTest, jsonb_field_io) {
             BufferReadable read_field_buf(field_str_ref);
 
             // we can't use read_binary because of the JsonbField is not POD 
type
-            JsonbField jsonb_from_field;
-            read_json_binary(jsonb_from_field, read_field_buf);
+            StringRef result;
+            read_field_buf.read_binary(result);
+            JsonbField jsonb_from_field = JsonbField(result.data, result.size);
             Field f2 = Field::create_field<TYPE_JSONB>(jsonb_from_field);
 
             ASSERT_EQ(f2.get_type(), TYPE_JSONB);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to