This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 1e16352526fa398dc6f3799d7256115c1313a73d
Author: Xinyi Zou <[email protected]>
AuthorDate: Mon Jun 13 20:41:48 2022 +0800

    [fix](brpc) Embed serialized request into the attachment and transmit it 
through http brpc (#9803)
    
    When the length of `Tuple/Block data` is greater than 2G, serialize the 
protoBuf request and embed the
    `Tuple/Block data` into the controller attachment and transmit it through 
http brpc.
    
    This is to avoid errors when the length of the protoBuf request exceeds 2G:
    `Bad request, error_text=[E1003]Fail to compress request`.
    
    In #7164, `Tuple/Block data` was put into attachment and sent via default 
`baidu_std brpc`,
    but when the attachment exceeds 2G, it will be truncated. There is no 2G 
limit for sending via `http brpc`.
    
    Also, in #7921, consider putting `Tuple/Block data` into attachment 
transport by default, as this theoretically
    reduces one serialization and improves performance. However, the test found 
that the performance did not improve,
    but the memory peak increased due to the addition of a memory copy.
---
 be/src/common/config.h                             |   8 +-
 be/src/exec/tablet_sink.cpp                        |  39 ++++--
 be/src/exec/tablet_sink.h                          |  14 +-
 be/src/runtime/data_stream_sender.cpp              |  51 ++++---
 be/src/runtime/data_stream_sender.h                |  15 +-
 be/src/runtime/row_batch.cpp                       |  52 +++----
 be/src/runtime/row_batch.h                         |   4 +-
 be/src/service/internal_service.cpp                | 156 ++++++++++++++++++---
 be/src/service/internal_service.h                  |  29 ++++
 be/src/util/brpc_client_cache.h                    |  21 ++-
 be/src/util/proto_util.h                           | 110 ++++++++++++++-
 be/src/vec/core/block.cpp                          |  27 +++-
 be/src/vec/core/block.h                            |   3 +-
 be/src/vec/sink/vdata_stream_sender.cpp            |  38 +++--
 be/src/vec/sink/vdata_stream_sender.h              |   9 +-
 be/src/vec/sink/vtablet_sink.cpp                   |   2 +
 be/test/vec/core/block_test.cpp                    |  11 +-
 docs/en/administrator-guide/config/be_config.md    |   6 +-
 docs/zh-CN/administrator-guide/config/be_config.md |   6 +-
 gensrc/proto/internal_service.proto                |   6 +
 20 files changed, 457 insertions(+), 150 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index fad7459bef..32d19468bf 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -549,9 +549,11 @@ CONF_String(default_rowset_type, "BETA");
 CONF_Int64(brpc_max_body_size, "3147483648");
 // Max unwritten bytes in each socket, if the limit is reached, Socket.Write 
fails with EOVERCROWDED
 CONF_Int64(brpc_socket_max_unwritten_bytes, "1073741824");
-// Whether to transfer RowBatch in ProtoBuf Request to Controller Attachment 
and send it
-// through brpc, this will be faster and avoid the error of Request length 
overflow.
-CONF_mBool(transfer_data_by_brpc_attachment, "false");
+// TODO(zxy): expect to be true in v1.3
+// Whether to embed the ProtoBuf Request serialized string together with 
Tuple/Block data into
+// Controller Attachment and send it through http brpc when the length of the 
Tuple/Block data
+// is greater than 1.8G. This is to avoid the error of Request length overflow 
(2G).
+CONF_mBool(transfer_large_data_by_brpc, "false");
 
 // max number of txns for every txn_partition_map in txn manager
 // this is a self protection to avoid too many txns saving in manager
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index f40d1b0211..f02d8bd375 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -46,9 +46,6 @@ namespace stream_load {
 
 NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, 
int64_t node_id)
         : _parent(parent), _index_channel(index_channel), _node_id(node_id) {
-    if (_parent->_transfer_data_by_brpc_attachment) {
-        _tuple_data_buffer_ptr = &_tuple_data_buffer;
-    }
 }
 
 NodeChannel::~NodeChannel() noexcept {
@@ -71,6 +68,7 @@ NodeChannel::~NodeChannel() noexcept {
 // returned directly via "TabletSink::prepare()" method.
 Status NodeChannel::init(RuntimeState* state) {
     _tuple_desc = _parent->_output_tuple_desc;
+    _state = state;
     auto node = _parent->_nodes_info->find_node(_node_id);
     if (node == nullptr) {
         std::stringstream ss;
@@ -495,7 +493,7 @@ void NodeChannel::try_send_batch() {
         SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
         size_t uncompressed_bytes = 0, compressed_bytes = 0;
         Status st = row_batch->serialize(request.mutable_row_batch(), 
&uncompressed_bytes,
-                                         &compressed_bytes, 
_tuple_data_buffer_ptr);
+                                         &compressed_bytes, 
_parent->_transfer_large_data_by_brpc);
         if (!st.ok()) {
             cancel(fmt::format("{}, err: {}", channel_info(), 
st.get_error_msg()));
             _add_batch_closure->clear_in_flight();
@@ -539,14 +537,31 @@ void NodeChannel::try_send_batch() {
         CHECK(_pending_batches_num == 0) << _pending_batches_num;
     }
 
-    if (_parent->_transfer_data_by_brpc_attachment && request.has_row_batch()) 
{
-        request_row_batch_transfer_attachment<PTabletWriterAddBatchRequest,
-                                              
ReusableClosure<PTabletWriterAddBatchResult>>(
-                &request, _tuple_data_buffer, _add_batch_closure);
+    if (_parent->_transfer_large_data_by_brpc && request.has_row_batch() &&
+        request.row_batch().has_tuple_data() && request.ByteSizeLong() > 
MIN_HTTP_BRPC_SIZE) {
+        Status st = request_embed_attachment_contain_tuple<
+                PTabletWriterAddBatchRequest, 
ReusableClosure<PTabletWriterAddBatchResult>>(
+                &request, _add_batch_closure);
+        if (!st.ok()) {
+            cancel(fmt::format("{}, err: {}", channel_info(), 
st.get_error_msg()));
+            _add_batch_closure->clear_in_flight();
+            return;
+        }
+        std::string brpc_url = fmt::format("http://{}:{}";, _node_info.host, 
_node_info.brpc_port);
+        std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
+                
_state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
+                                                                               
           "http");
+        _add_batch_closure->cntl.http_request().uri() =
+                brpc_url + 
"/PInternalServiceImpl/tablet_writer_add_batch_by_http";
+        
_add_batch_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
+        
_add_batch_closure->cntl.http_request().set_content_type("application/json");
+        _brpc_http_stub->tablet_writer_add_batch_by_http(
+                &_add_batch_closure->cntl, NULL, &_add_batch_closure->result, 
_add_batch_closure);
+    } else {
+        _add_batch_closure->cntl.http_request().Clear();
+        _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request,
+                                       &_add_batch_closure->result, 
_add_batch_closure);
     }
-    _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request, 
&_add_batch_closure->result,
-                                   _add_batch_closure);
-
     _next_packet_seq++;
 }
 
@@ -690,7 +705,7 @@ OlapTableSink::OlapTableSink(ObjectPool* pool, const 
RowDescriptor& row_desc,
         *status = Expr::create_expr_trees(_pool, texprs, &_output_expr_ctxs);
     }
     _name = "OlapTableSink";
-    _transfer_data_by_brpc_attachment = 
config::transfer_data_by_brpc_attachment;
+    _transfer_large_data_by_brpc = config::transfer_large_data_by_brpc;
 }
 
 OlapTableSink::~OlapTableSink() {
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index a6af6c15e7..1029cecb4a 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -284,15 +284,6 @@ private:
     std::atomic<int64_t> _queue_push_lock_ns {0};
     std::atomic<int64_t> _actual_consume_ns {0};
 
-    // buffer for saving serialized row batch data.
-    // In the non-attachment approach, we need to use two PRowBatch structures 
alternately
-    // so that when one PRowBatch is sent, the other PRowBatch can be used for 
the serialization of the next RowBatch.
-    // This is not necessary with the attachment approach, because the memory 
structures
-    // are already copied into attachment memory before sending, and will wait 
for
-    // the previous RPC to be fully completed before the next copy.
-    std::string _tuple_data_buffer;
-    std::string* _tuple_data_buffer_ptr = nullptr;
-
     // the timestamp when this node channel be marked closed and finished 
closed
     uint64_t _close_time_ms = 0;
 
@@ -306,6 +297,7 @@ private:
     // The IndexChannel is definitely accessible until the NodeChannel is 
closed.
     std::mutex _closed_lock;
     bool _is_closed = false;
+    RuntimeState* _state;
 };
 
 class IndexChannel {
@@ -492,8 +484,8 @@ protected:
     // Save the status of close() method
     Status _close_status;
 
-    // TODO(cmy): this should be removed after we switch to rpc attachment by 
default.
-    bool _transfer_data_by_brpc_attachment = false;
+    // User can change this config at runtime, avoid it being modified during 
query or loading process.
+    bool _transfer_large_data_by_brpc = false;
 
     // FIND_TABLET_EVERY_ROW is used for both hash and random distribution 
info, which indicates that we
     // should compute tablet index for every row
diff --git a/be/src/runtime/data_stream_sender.cpp 
b/be/src/runtime/data_stream_sender.cpp
index 93b0a128e6..ca095cef64 100644
--- a/be/src/runtime/data_stream_sender.cpp
+++ b/be/src/runtime/data_stream_sender.cpp
@@ -82,6 +82,7 @@ DataStreamSender::Channel::~Channel() {
     }
     // release this before request desctruct
     _brpc_request.release_finst_id();
+    _brpc_request.release_query_id();
 }
 
 Status DataStreamSender::Channel::init(RuntimeState* state) {
@@ -101,6 +102,11 @@ Status DataStreamSender::Channel::init(RuntimeState* 
state) {
     _finst_id.set_hi(_fragment_instance_id.hi);
     _finst_id.set_lo(_fragment_instance_id.lo);
     _brpc_request.set_allocated_finst_id(&_finst_id);
+
+    _query_id.set_hi(state->query_id().hi);
+    _query_id.set_lo(state->query_id().lo);
+    _brpc_request.set_allocated_query_id(&_query_id);
+
     _brpc_request.set_node_id(_dest_node_id);
     _brpc_request.set_sender_id(_parent->_sender_id);
     _brpc_request.set_be_number(_be_number);
@@ -120,6 +126,7 @@ Status DataStreamSender::Channel::init(RuntimeState* state) 
{
             return Status::InternalError(msg);
         }
     }
+    _state = state;
     return Status::OK();
 }
 
@@ -147,12 +154,28 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* 
batch, bool eos) {
     _closure->ref();
     _closure->cntl.set_timeout_ms(_brpc_timeout_ms);
 
-    if (_parent->_transfer_data_by_brpc_attachment && 
_brpc_request.has_row_batch()) {
-        request_row_batch_transfer_attachment<PTransmitDataParams,
-            RefCountClosure<PTransmitDataResult>>(&_brpc_request, 
_parent->_tuple_data_buffer,
-                    _closure);
+    if (_parent->_transfer_large_data_by_brpc && _brpc_request.has_row_batch() 
&&
+        _brpc_request.row_batch().has_tuple_data() &&
+        _brpc_request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) {
+        Status st = request_embed_attachment_contain_tuple<PTransmitDataParams,
+                                                           
RefCountClosure<PTransmitDataResult>>(
+                &_brpc_request, _closure);
+        RETURN_IF_ERROR(st);
+        std::string brpc_url =
+                fmt::format("http://{}:{}";, _brpc_dest_addr.hostname, 
_brpc_dest_addr.port);
+        std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
+                
_state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
+                                                                               
           "http");
+        _closure->cntl.http_request().uri() =
+                brpc_url + "/PInternalServiceImpl/transmit_data_by_http";
+        _closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
+        _closure->cntl.http_request().set_content_type("application/json");
+        _brpc_http_stub->transmit_data_by_http(&_closure->cntl, NULL, 
&_closure->result, _closure);
+    } else {
+        _closure->cntl.http_request().Clear();
+        _brpc_stub->transmit_data(&_closure->cntl, &_brpc_request, 
&_closure->result, _closure);
     }
-    _brpc_stub->transmit_data(&_closure->cntl, &_brpc_request, 
&_closure->result, _closure);
+
     if (batch != nullptr) {
         _brpc_request.release_row_batch();
     }
@@ -272,13 +295,7 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int 
sender_id, const RowDes
           _sender_id(sender_id),
           _serialize_batch_timer(nullptr),
           _bytes_sent_counter(nullptr),
-          _local_bytes_send_counter(nullptr),
-          
_transfer_data_by_brpc_attachment(config::transfer_data_by_brpc_attachment) {
-
-    if (_transfer_data_by_brpc_attachment) {
-        _tuple_data_buffer_ptr = &_tuple_data_buffer; 
-    }
-}
+          _local_bytes_send_counter(nullptr) {}
 
 DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const 
RowDescriptor& row_desc,
                                    const TDataStreamSink& sink,
@@ -297,12 +314,7 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int 
sender_id, const RowDes
           _part_type(sink.output_partition.type),
           _ignore_not_found(sink.__isset.ignore_not_found ? 
sink.ignore_not_found : true),
           _dest_node_id(sink.dest_node_id),
-          
_transfer_data_by_brpc_attachment(config::transfer_data_by_brpc_attachment) {
-
-    if (_transfer_data_by_brpc_attachment) {
-        _tuple_data_buffer_ptr = &_tuple_data_buffer; 
-    }
-
+          _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc) {
     DCHECK_GT(destinations.size(), 0);
     DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
            sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
@@ -668,7 +680,8 @@ Status DataStreamSender::serialize_batch(RowBatch* src, 
PRowBatch* dest, int num
     {
         SCOPED_TIMER(_serialize_batch_timer);
         size_t uncompressed_bytes = 0, compressed_bytes = 0;
-        RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, 
&compressed_bytes, _tuple_data_buffer_ptr));
+        RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, 
&compressed_bytes,
+                                       _transfer_large_data_by_brpc));
         COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers);
         COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes * 
num_receivers);
     }
diff --git a/be/src/runtime/data_stream_sender.h 
b/be/src/runtime/data_stream_sender.h
index 1da9d32026..377d13f1fa 100644
--- a/be/src/runtime/data_stream_sender.h
+++ b/be/src/runtime/data_stream_sender.h
@@ -197,8 +197,9 @@ protected:
 
         TNetworkAddress _brpc_dest_addr;
 
-        // TODO(zc): initused for brpc
+        // TODO(zc): init used for brpc
         PUniqueId _finst_id;
+        PUniqueId _query_id;
 
         // serialized batches for broadcasting; we need two so we can write
         // one while the other one is still being sent.
@@ -211,6 +212,7 @@ protected:
         PTransmitDataParams _brpc_request;
         std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
         RefCountClosure<PTransmitDataResult>* _closure = nullptr;
+        RuntimeState* _state;
         int32_t _brpc_timeout_ms = 500;
         // whether the dest can be treated as query statistics transfer chain.
         bool _is_transfer_chain;
@@ -257,14 +259,6 @@ private:
     PRowBatch _pb_batch1;
     PRowBatch _pb_batch2;
 
-    // This buffer is used to store the serialized rowbatch data.
-    // Only works when `config::transfer_data_by_brpc_attachment` is true.
-    // The data in the buffer is copied to the attachment of the brpc when it 
is sent,
-    // to avoid an extra pb serialization in the brpc.
-    // _tuple_data_buffer_ptr will point to _tuple_data_buffer if 
`config::transfer_data_by_brpc_attachment` is true.
-    std::string _tuple_data_buffer;
-    std::string* _tuple_data_buffer_ptr = nullptr;
-
     std::vector<ExprContext*> _partition_expr_ctxs; // compute per-row 
partition values
 
     // map from range value to partition_id
@@ -279,7 +273,8 @@ private:
     // Identifier of the destination plan node.
     PlanNodeId _dest_node_id;
 
-    bool _transfer_data_by_brpc_attachment = false;
+    // User can change this config at runtime, avoid it being modified during 
query or loading process.
+    bool _transfer_large_data_by_brpc = false;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index cdbbebbfed..02d70b16e1 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -237,7 +237,7 @@ RowBatch::~RowBatch() {
 }
 
 Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
-                           size_t* compressed_size, std::string* 
allocated_buf) {
+                           size_t* compressed_size, bool 
allow_transfer_large_data) {
     // num_rows
     output_batch->set_num_rows(_num_rows);
     // row_tuples
@@ -252,19 +252,9 @@ Status RowBatch::serialize(PRowBatch* output_batch, 
size_t* uncompressed_size,
     // is_compressed
     output_batch->set_is_compressed(false);
     // tuple data
-    size_t size = total_byte_size();
-    std::string* mutable_tuple_data = nullptr;
-    if (allocated_buf != nullptr) {
-        allocated_buf->resize(size);
-        // all tuple data will be written in the allocated_buf
-        // instead of tuple_data in PRowBatch
-        mutable_tuple_data = allocated_buf;
-        // tuple_data is a required field
-        output_batch->set_tuple_data("");
-    } else {
-        mutable_tuple_data = output_batch->mutable_tuple_data();
-        mutable_tuple_data->resize(size);
-    }
+    size_t tuple_byte_size = total_byte_size();
+    std::string* mutable_tuple_data = output_batch->mutable_tuple_data();
+    mutable_tuple_data->resize(tuple_byte_size);
 
     // Copy tuple data, including strings, into output_batch (converting string
     // pointers into offsets in the process)
@@ -288,15 +278,15 @@ Status RowBatch::serialize(PRowBatch* output_batch, 
size_t* uncompressed_size,
             mutable_tuple_offsets->Add((int32_t) offset);
             mutable_new_tuple_offsets->Add(offset);
             row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* 
convert_ptrs */ true);
-            CHECK_LE(offset, size);
+            CHECK_LE(offset, tuple_byte_size);
         }
     }
-    CHECK_EQ(offset, size) << "offset: " << offset << " vs. size: " << size;
+    CHECK_EQ(offset, tuple_byte_size) << "offset: " << offset << " vs. size: " 
<< tuple_byte_size;
 
-    if (config::compress_rowbatches && size > 0) {
+    if (config::compress_rowbatches && tuple_byte_size > 0) {
         // Try compressing tuple_data to _compression_scratch, swap if 
compressed data is
         // smaller
-        uint32_t max_compressed_size = snappy::MaxCompressedLength(size);
+        uint32_t max_compressed_size = 
snappy::MaxCompressedLength(tuple_byte_size);
 
         if (_compression_scratch.size() < max_compressed_size) {
             _compression_scratch.resize(max_compressed_size);
@@ -304,33 +294,25 @@ Status RowBatch::serialize(PRowBatch* output_batch, 
size_t* uncompressed_size,
 
         size_t compressed_size = 0;
         char* compressed_output = _compression_scratch.data();
-        snappy::RawCompress(mutable_tuple_data->data(), size, 
compressed_output, &compressed_size);
+        snappy::RawCompress(mutable_tuple_data->data(), tuple_byte_size, 
compressed_output, &compressed_size);
 
-        if (LIKELY(compressed_size < size)) {
+        if (LIKELY(compressed_size < tuple_byte_size)) {
             _compression_scratch.resize(compressed_size);
             mutable_tuple_data->swap(_compression_scratch);
             output_batch->set_is_compressed(true);
         }
 
-        VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << 
compressed_size;
+        VLOG_ROW << "uncompressed size: " << tuple_byte_size << ", compressed 
size: " << compressed_size;
     }
 
     // return compressed and uncompressed size
     size_t pb_size = get_batch_size(*output_batch);
-    if (allocated_buf == nullptr) {
-        *uncompressed_size = pb_size - mutable_tuple_data->size() + size;
-        *compressed_size = pb_size;
-        if (pb_size > std::numeric_limits<int32_t>::max()) {
-            // the protobuf has a hard limit of 2GB for serialized data.
-            return Status::InternalError(
-                    fmt::format("The rowbatch is large than 2GB({}), can not 
send by Protobuf. "
-                                "please set BE config 
'transfer_data_by_brpc_attachment' to true "
-                                "and restart BE.",
-                                pb_size));
-        }
-    } else {
-        *uncompressed_size = pb_size + size;
-        *compressed_size = pb_size + mutable_tuple_data->size();
+    *uncompressed_size = pb_size - mutable_tuple_data->size() + 
tuple_byte_size;
+    *compressed_size = pb_size;
+    if (!allow_transfer_large_data && pb_size > 
std::numeric_limits<int32_t>::max()) {
+        // the protobuf has a hard limit of 2GB for serialized data.
+        return Status::InternalError(fmt::format(
+                "The rowbatch is large than 2GB({}), can not send by 
Protobuf.", pb_size));
     }
     return Status::OK();
 }
diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h
index 070a1e578f..5aad1a0e62 100644
--- a/be/src/runtime/row_batch.h
+++ b/be/src/runtime/row_batch.h
@@ -351,10 +351,8 @@ public:
     // This function does not reset().
     // Returns the uncompressed serialized size (this will be the true size of 
output_batch
     // if tuple_data is actually uncompressed).
-    // if allocated_buf is not null, the serialized tuple data will be saved 
in this buf
-    // instead of `tuple_data` in PRowBatch.
     Status serialize(PRowBatch* output_batch, size_t* uncompressed_size, 
size_t* compressed_size,
-                     std::string* allocated_buf = nullptr);
+                     bool allow_transfer_large_data = false);
 
     // Utility function: returns total size of batch.
     static size_t get_batch_size(const PRowBatch& batch);
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index c1185591e6..c4c48a3c8b 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -42,6 +42,28 @@ namespace doris {
 
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(add_batch_task_queue_size, 
MetricUnit::NOUNIT);
 
+template <typename T>
+class NewHttpClosure : public ::google::protobuf::Closure {
+public:
+    NewHttpClosure(T* request, google::protobuf::Closure* done) : 
_request(request), _done(done) {}
+    ~NewHttpClosure() {}
+
+    void Run() {
+        if (_request != nullptr) {
+            delete _request;
+            _request = nullptr;
+        }
+        if (_done != nullptr) {
+            _done->Run();
+        }
+        delete this;
+    }
+
+private:
+    T* _request = nullptr;
+    google::protobuf::Closure* _done = nullptr;
+};
+
 template <typename T>
 PInternalServiceImpl<T>::PInternalServiceImpl(ExecEnv* exec_env)
         : _exec_env(exec_env), 
_tablet_worker_pool(config::number_tablet_writer_threads, 10240) {
@@ -63,15 +85,51 @@ void 
PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cnt
              << " node=" << request->node_id();
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
     attachment_transfer_request_row_batch<PTransmitDataParams>(request, cntl);
+
+    _transmit_data(cntl_base, request, response, done, Status::OK());
+}
+
+template <typename T>
+void 
PInternalServiceImpl<T>::transmit_data_by_http(google::protobuf::RpcController* 
cntl_base,
+                                                 const PEmptyRequest* request,
+                                                 PTransmitDataResult* response,
+                                                 google::protobuf::Closure* 
done) {
+    PTransmitDataParams* request_raw = new PTransmitDataParams();
+    google::protobuf::Closure* done_raw =
+            new NewHttpClosure<PTransmitDataParams>(request_raw, done);
+    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+    Status st = 
attachment_extract_request_contain_tuple<PTransmitDataParams>(request_raw, 
cntl);
+    _transmit_data(cntl_base, request_raw, response, done_raw, st);
+}
+
+template <typename T>
+void PInternalServiceImpl<T>::_transmit_data(google::protobuf::RpcController* 
cntl_base,
+                                          const PTransmitDataParams* request,
+                                          PTransmitDataResult* response,
+                                          google::protobuf::Closure* done,
+                                          const Status& extract_st) {
+    std::string query_id;
+    if (request->has_query_id()) {
+        query_id = print_id(request->query_id());
+        TUniqueId finst_id;
+        finst_id.__set_hi(request->finst_id().hi());
+        finst_id.__set_lo(request->finst_id().lo());
+    }
+    VLOG_ROW << "transmit data: fragment_instance_id=" << 
print_id(request->finst_id())
+             << " query_id=" << query_id << " node=" << request->node_id();
     // The response is accessed when done->Run is called in transmit_data(),
     // give response a default value to avoid null pointers in high 
concurrency.
     Status st;
     st.to_protobuf(response->mutable_status());
-    st = _exec_env->stream_mgr()->transmit_data(request, &done);
-    if (!st.ok()) {
-        LOG(WARNING) << "transmit_data failed, message=" << st.get_error_msg()
-                     << ", fragment_instance_id=" << 
print_id(request->finst_id())
-                     << ", node=" << request->node_id();
+    if (extract_st.ok()) {
+        st = _exec_env->stream_mgr()->transmit_data(request, &done);
+        if (!st.ok()) {
+            LOG(WARNING) << "transmit_data failed, message=" << 
st.get_error_msg()
+                         << ", fragment_instance_id=" << 
print_id(request->finst_id())
+                         << ", node=" << request->node_id();
+        }
+    } else {
+        st = extract_st;
     }
     if (done != nullptr) {
         st.to_protobuf(response->mutable_status());
@@ -133,9 +191,34 @@ void 
PInternalServiceImpl<T>::exec_plan_fragment_start(google::protobuf::RpcCont
 
 template <typename T>
 void 
PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcController*
 cntl_base,
-                                                      const 
PTabletWriterAddBatchRequest* request,
-                                                      
PTabletWriterAddBatchResult* response,
-                                                      
google::protobuf::Closure* done) {
+                                                   const 
PTabletWriterAddBatchRequest* request,
+                                                   
PTabletWriterAddBatchResult* response,
+                                                   google::protobuf::Closure* 
done) {
+    _tablet_writer_add_batch(cntl_base, request, response, done);
+}
+
+template <typename T>
+void PInternalServiceImpl<T>::tablet_writer_add_batch_by_http(
+        google::protobuf::RpcController* cntl_base, const 
::doris::PEmptyRequest* request,
+        PTabletWriterAddBatchResult* response, google::protobuf::Closure* 
done) {
+    PTabletWriterAddBatchRequest* request_raw = new 
PTabletWriterAddBatchRequest();
+    google::protobuf::Closure* done_raw =
+            new NewHttpClosure<PTabletWriterAddBatchRequest>(request_raw, 
done);
+    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+    Status st = 
attachment_extract_request_contain_tuple<PTabletWriterAddBatchRequest>(request_raw,
+                                                                               
        cntl);
+    if (st.ok()) {
+        _tablet_writer_add_batch(cntl_base, request_raw, response, done_raw);
+    } else {
+        st.to_protobuf(response->mutable_status());
+    }
+}
+
+template <typename T>
+void 
PInternalServiceImpl<T>::_tablet_writer_add_batch(google::protobuf::RpcController*
 cntl_base,
+                                                    const 
PTabletWriterAddBatchRequest* request,
+                                                    
PTabletWriterAddBatchResult* response,
+                                                    google::protobuf::Closure* 
done) {
     VLOG_RPC << "tablet writer add batch, id=" << request->id()
              << ", index_id=" << request->index_id() << ", sender_id=" << 
request->sender_id()
              << ", current_queued_size=" << 
_tablet_worker_pool.get_queue_size();
@@ -149,8 +232,10 @@ void 
PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
         int64_t execution_time_ns = 0;
         {
             SCOPED_RAW_TIMER(&execution_time_ns);
+            // TODO(zxy) delete in 1.2 version
             brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
             
attachment_transfer_request_row_batch<PTabletWriterAddBatchRequest>(request, 
cntl);
+
             auto st = _exec_env->load_channel_mgr()->add_batch(*request, 
response);
             if (!st.ok()) {
                 LOG(WARNING) << "tablet writer add batch failed, message=" << 
st.get_error_msg()
@@ -460,22 +545,57 @@ Status PInternalServiceImpl<T>::_fold_constant_expr(const 
std::string& ser_reque
 
 template <typename T>
 void PInternalServiceImpl<T>::transmit_block(google::protobuf::RpcController* 
cntl_base,
-                                             const PTransmitDataParams* 
request,
-                                             PTransmitDataResult* response,
-                                             google::protobuf::Closure* done) {
-    VLOG_ROW << "transmit data: fragment_instance_id=" << 
print_id(request->finst_id())
-             << " node=" << request->node_id();
+                                          const PTransmitDataParams* request,
+                                          PTransmitDataResult* response,
+                                          google::protobuf::Closure* done) {
+    // TODO(zxy) delete in 1.2 version
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
     attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
+
+    _transmit_block(cntl_base, request, response, done, Status::OK());
+}
+
+template <typename T>
+void 
PInternalServiceImpl<T>::transmit_block_by_http(google::protobuf::RpcController*
 cntl_base,
+                                                  const PEmptyRequest* request,
+                                                  PTransmitDataResult* 
response,
+                                                  google::protobuf::Closure* 
done) {
+    PTransmitDataParams* request_raw = new PTransmitDataParams();
+    google::protobuf::Closure* done_raw =
+            new NewHttpClosure<PTransmitDataParams>(request_raw, done);
+    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+    Status st = 
attachment_extract_request_contain_block<PTransmitDataParams>(request_raw, 
cntl);
+    _transmit_block(cntl_base, request_raw, response, done_raw, st);
+}
+
+template <typename T>
+void PInternalServiceImpl<T>::_transmit_block(google::protobuf::RpcController* 
cntl_base,
+                                           const PTransmitDataParams* request,
+                                           PTransmitDataResult* response,
+                                           google::protobuf::Closure* done,
+                                           const Status& extract_st) {
+    std::string query_id;
+    if (request->has_query_id()) {
+        query_id = print_id(request->query_id());
+        TUniqueId finst_id;
+        finst_id.__set_hi(request->finst_id().hi());
+        finst_id.__set_lo(request->finst_id().lo());
+    }
+    VLOG_ROW << "transmit block: fragment_instance_id=" << 
print_id(request->finst_id())
+             << " query_id=" << query_id << " node=" << request->node_id();
     // The response is accessed when done->Run is called in transmit_block(),
     // give response a default value to avoid null pointers in high 
concurrency.
     Status st;
     st.to_protobuf(response->mutable_status());
-    st = _exec_env->vstream_mgr()->transmit_block(request, &done);
-    if (!st.ok()) {
-        LOG(WARNING) << "transmit_block failed, message=" << st.get_error_msg()
-                     << ", fragment_instance_id=" << 
print_id(request->finst_id())
-                     << ", node=" << request->node_id();
+    if (extract_st.ok()) {
+        st = _exec_env->vstream_mgr()->transmit_block(request, &done);
+        if (!st.ok()) {
+            LOG(WARNING) << "transmit_block failed, message=" << 
st.get_error_msg()
+                         << ", fragment_instance_id=" << 
print_id(request->finst_id())
+                         << ", node=" << request->node_id();
+        }
+    } else {
+        st = extract_st;
     }
     if (done != nullptr) {
         st.to_protobuf(response->mutable_status());
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index 550773b950..0ca63aa5f9 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -41,6 +41,11 @@ public:
                        ::doris::PTransmitDataResult* response,
                        ::google::protobuf::Closure* done) override;
 
+    void transmit_data_by_http(::google::protobuf::RpcController* controller,
+                               const ::doris::PEmptyRequest* request,
+                               ::doris::PTransmitDataResult* response,
+                               ::google::protobuf::Closure* done) override;
+
     void exec_plan_fragment(google::protobuf::RpcController* controller,
                             const PExecPlanFragmentRequest* request,
                             PExecPlanFragmentResult* result,
@@ -74,6 +79,11 @@ public:
                                  PTabletWriterAddBatchResult* response,
                                  google::protobuf::Closure* done) override;
 
+    void tablet_writer_add_batch_by_http(google::protobuf::RpcController* 
controller,
+                                         const ::doris::PEmptyRequest* request,
+                                         PTabletWriterAddBatchResult* response,
+                                         google::protobuf::Closure* done) 
override;
+
     void tablet_writer_cancel(google::protobuf::RpcController* controller,
                               const PTabletWriterCancelRequest* request,
                               PTabletWriterCancelResult* response,
@@ -105,6 +115,10 @@ public:
                         const ::doris::PTransmitDataParams* request,
                         ::doris::PTransmitDataResult* response,
                         ::google::protobuf::Closure* done) override;
+    void transmit_block_by_http(::google::protobuf::RpcController* controller,
+                                const ::doris::PEmptyRequest* request,
+                                ::doris::PTransmitDataResult* response,
+                                ::google::protobuf::Closure* done) override;
 
     void send_data(google::protobuf::RpcController* controller, const 
PSendDataRequest* request,
                    PSendDataResult* response, google::protobuf::Closure* done) 
override;
@@ -132,6 +146,21 @@ private:
 
     Status _fold_constant_expr(const std::string& ser_request, 
PConstantExprResult* response);
 
+    void _transmit_data(::google::protobuf::RpcController* controller,
+                        const ::doris::PTransmitDataParams* request,
+                        ::doris::PTransmitDataResult* response, 
::google::protobuf::Closure* done,
+                        const Status& extract_st);
+
+    void _transmit_block(::google::protobuf::RpcController* controller,
+                         const ::doris::PTransmitDataParams* request,
+                         ::doris::PTransmitDataResult* response, 
::google::protobuf::Closure* done,
+                         const Status& extract_st);
+
+    void _tablet_writer_add_batch(google::protobuf::RpcController* controller,
+                                  const PTabletWriterAddBatchRequest* request,
+                                  PTabletWriterAddBatchResult* response,
+                                  google::protobuf::Closure* done);
+
 private:
     ExecEnv* _exec_env;
     PriorityThreadPool _tablet_worker_pool;
diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h
index b15a3c6aeb..aef96b6ec6 100644
--- a/be/src/util/brpc_client_cache.h
+++ b/be/src/util/brpc_client_cache.h
@@ -71,9 +71,23 @@ public:
         }
 
         // new one stub and insert into map
+        auto stub = get_new_client_no_cache(host_port);
+        _stub_map.try_emplace_l(
+                host_port, [&stub](typename StubMap<T>::mapped_type& v) { stub 
= v; }, stub);
+        return stub;
+    }
+
+    std::shared_ptr<T> get_new_client_no_cache(const std::string& host_port,
+                                               const std::string& protocol = 
"baidu_std",
+                                               const std::string& connect_type 
= "") {
         brpc::ChannelOptions options;
         if constexpr (std::is_same_v<T, PFunctionService_Stub>) {
             options.protocol = config::function_service_protocol;
+        } else {
+            options.protocol = protocol;
+        }
+        if (connect_type != "") {
+            options.connection_type = connect_type;
         }
         std::unique_ptr<brpc::Channel> channel(new brpc::Channel());
         int ret_code = 0;
@@ -86,12 +100,7 @@ public:
         if (ret_code) {
             return nullptr;
         }
-        auto stub = std::make_shared<T>(channel.release(),
-                                        
google::protobuf::Service::STUB_OWNS_CHANNEL);
-        _stub_map.try_emplace_l(host_port,
-                                [&stub](typename StubMap<T>::mapped_type& v) { 
stub = v; },
-                                stub);
-        return stub;
+        return std::make_shared<T>(channel.release(), 
google::protobuf::Service::STUB_OWNS_CHANNEL);
     }
 
     inline size_t size() { return _stub_map.size(); }
diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h
index 52dcec1861..0144d87396 100644
--- a/be/src/util/proto_util.h
+++ b/be/src/util/proto_util.h
@@ -17,10 +17,16 @@
 
 #pragma once
 
-#include "util/stack_util.h"
-
 namespace doris {
 
+// When the tuple/block data is greater than 2G, embed the tuple/block data
+// and the request serialization string in the attachment, and use "http" brpc.
+// "http"brpc requires that only one of request and attachment be non-null.
+//
+// 2G: In the default "baidu_std" brpcd, upper limit of the request and 
attachment length is 2G.
+constexpr size_t MIN_HTTP_BRPC_SIZE = (1ULL << 31);
+
+// TODO(zxy) delete in v1.3 version
 // Transfer RowBatch in ProtoBuf Request to Controller Attachment.
 // This can avoid reaching the upper limit of the ProtoBuf Request length (2G),
 // and it is expected that performance can be improved.
@@ -34,6 +40,7 @@ inline void request_row_batch_transfer_attachment(Params* 
brpc_request, const st
     closure->cntl.request_attachment().swap(attachment);
 }
 
+// TODO(zxy) delete in v1.3 version
 // Transfer Block in ProtoBuf Request to Controller Attachment.
 // This can avoid reaching the upper limit of the ProtoBuf Request length (2G),
 // and it is expected that performance can be improved.
@@ -47,6 +54,7 @@ inline void request_block_transfer_attachment(Params* 
brpc_request, const std::s
     closure->cntl.request_attachment().swap(attachment);
 }
 
+// TODO(zxy) delete in v1.3 version
 // Controller Attachment transferred to RowBatch in ProtoBuf Request.
 template <typename Params>
 inline void attachment_transfer_request_row_batch(const Params* brpc_request, 
brpc::Controller* cntl) {
@@ -59,6 +67,7 @@ inline void attachment_transfer_request_row_batch(const 
Params* brpc_request, br
     }
 }
 
+// TODO(zxy) delete in v1.3 version
 // Controller Attachment transferred to Block in ProtoBuf Request.
 template <typename Params>
 inline void attachment_transfer_request_block(const Params* brpc_request, 
brpc::Controller* cntl) {
@@ -71,4 +80,101 @@ inline void attachment_transfer_request_block(const Params* 
brpc_request, brpc::
     }
 }
 
+// Embed tuple_data and brpc request serialization string in controller 
attachment.
+template <typename Params, typename Closure>
+inline Status request_embed_attachment_contain_tuple(Params* brpc_request, 
Closure* closure) {
+    auto row_batch = brpc_request->row_batch();
+    Status st = request_embed_attachment(brpc_request, row_batch.tuple_data(), 
closure);
+    row_batch.set_tuple_data("");
+    return st;
+}
+
+// Embed column_values and brpc request serialization string in controller 
attachment.
+template <typename Params, typename Closure>
+inline Status request_embed_attachment_contain_block(Params* brpc_request, 
Closure* closure) {
+    auto block = brpc_request->block();
+    Status st = request_embed_attachment(brpc_request, block.column_values(), 
closure);
+    block.set_column_values("");
+    return st;
+}
+
+template <typename Params, typename Closure>
+inline Status request_embed_attachment(Params* brpc_request, const 
std::string& data,
+                                       Closure* closure) {
+    butil::IOBuf attachment;
+
+    // step1: serialize brpc_request to string, and append to attachment.
+    std::string req_str;
+    brpc_request->SerializeToString(&req_str);
+    int64_t req_str_size = req_str.size();
+    attachment.append(&req_str_size, sizeof(req_str_size));
+    attachment.append(req_str);
+
+    // step2: append data to attachment and put it in the closure.
+    int64_t data_size = data.size();
+    attachment.append(&data_size, sizeof(data_size));
+    try {
+        attachment.append(data);
+    } catch (...) {
+        std::exception_ptr p = std::current_exception();
+        LOG(WARNING) << "Try to alloc " << data_size
+                     << " bytes for append data to attachment failed. "
+                     << (p ? p.__cxa_exception_type()->name() : "null");
+        return Status::MemoryAllocFailed(
+                fmt::format("request embed attachment failed to memcpy {} 
bytes", data_size));
+    }
+    // step3: attachment add to closure.
+    closure->cntl.request_attachment().swap(attachment);
+    return Status::OK();
+}
+
+// Extract the brpc request and tuple data from the controller attachment,
+// and put the tuple data into the request.
+template <typename Params>
+inline Status attachment_extract_request_contain_tuple(const Params* 
brpc_request,
+                                                       brpc::Controller* cntl) 
{
+    Params* req = const_cast<Params*>(brpc_request);
+    auto rb = req->mutable_row_batch();
+    return attachment_extract_request(req, cntl, rb->mutable_tuple_data());
+}
+
+// Extract the brpc request and block from the controller attachment,
+// and put the block into the request.
+template <typename Params>
+inline Status attachment_extract_request_contain_block(const Params* 
brpc_request,
+                                                       brpc::Controller* cntl) 
{
+    Params* req = const_cast<Params*>(brpc_request);
+    auto block = req->mutable_block();
+    return attachment_extract_request(req, cntl, 
block->mutable_column_values());
+}
+
+template <typename Params>
+inline Status attachment_extract_request(const Params* brpc_request, 
brpc::Controller* cntl,
+                                         std::string* data) {
+    const butil::IOBuf& io_buf = cntl->request_attachment();
+
+    // step1: deserialize request string to brpc_request from attachment.
+    int64_t req_str_size;
+    io_buf.copy_to(&req_str_size, sizeof(req_str_size), 0);
+    std::string req_str;
+    io_buf.copy_to(&req_str, req_str_size, sizeof(req_str_size));
+    Params* req = const_cast<Params*>(brpc_request);
+    req->ParseFromString(req_str);
+
+    // step2: extract data from attachment.
+    int64_t data_size;
+    io_buf.copy_to(&data_size, sizeof(data_size), sizeof(req_str_size) + 
req_str_size);
+    try {
+        io_buf.copy_to(data, data_size, sizeof(data_size) + 
sizeof(req_str_size) + req_str_size);
+    } catch (...) {
+        std::exception_ptr p = std::current_exception();
+        LOG(WARNING) << "Try to alloc " << data_size
+                     << " bytes for extract data from attachment failed. "
+                     << (p ? p.__cxa_exception_type()->name() : "null");
+        return Status::MemoryAllocFailed(
+                fmt::format("attachment extract request failed to memcpy {} 
bytes", data_size));
+    }
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 0d275f6604..bcc06b51b1 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -721,7 +721,7 @@ Status Block::filter_block(Block* block, int 
filter_column_id, int column_to_kee
 }
 
 Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* 
compressed_bytes,
-                        std::string* allocated_buf) const {
+                        bool allow_transfer_large_data) const {
     // calc uncompressed size for allocation
     size_t content_uncompressed_size = 0;
     for (const auto& c : *this) {
@@ -733,8 +733,20 @@ Status Block::serialize(PBlock* pblock, size_t* 
uncompressed_bytes, size_t* comp
 
     // serialize data values
     // when data type is HLL, content_uncompressed_size maybe larger than real 
size.
-    allocated_buf->resize(content_uncompressed_size);
-    char* buf = allocated_buf->data();
+    std::string* column_values = nullptr;
+    try {
+        column_values = pblock->mutable_column_values();
+        column_values->resize(content_uncompressed_size);
+    } catch (...) {
+        std::exception_ptr p = std::current_exception();
+        std::string msg = fmt::format(
+                "Try to alloc {} bytes for pblock column values failed. reason 
{}",
+                content_uncompressed_size, p ? 
p.__cxa_exception_type()->name() : "null");
+        LOG(WARNING) << msg;
+        return Status::BufferAllocFailed(msg);
+    }
+    char* buf = column_values->data();
+
     for (const auto& c : *this) {
         buf = c.type->serialize(*(c.column), buf);
     }
@@ -750,12 +762,12 @@ Status Block::serialize(PBlock* pblock, size_t* 
uncompressed_bytes, size_t* comp
 
         size_t compressed_size = 0;
         char* compressed_output = compression_scratch.data();
-        snappy::RawCompress(allocated_buf->data(), content_uncompressed_size, 
compressed_output,
+        snappy::RawCompress(column_values->data(), content_uncompressed_size, 
compressed_output,
                             &compressed_size);
 
         if (LIKELY(compressed_size < content_uncompressed_size)) {
             compression_scratch.resize(compressed_size);
-            allocated_buf->swap(compression_scratch);
+            column_values->swap(compression_scratch);
             pblock->set_compressed(true);
             *compressed_bytes = compressed_size;
         } else {
@@ -765,7 +777,10 @@ Status Block::serialize(PBlock* pblock, size_t* 
uncompressed_bytes, size_t* comp
         VLOG_ROW << "uncompressed size: " << content_uncompressed_size
                  << ", compressed size: " << compressed_size;
     }
-
+    if (!allow_transfer_large_data && *compressed_bytes >= 
std::numeric_limits<int32_t>::max()) {
+        return Status::InternalError(fmt::format(
+                "The block is large than 2GB({}), can not send by Protobuf.", 
*compressed_bytes));
+    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 9bc86c4533..31b3602a8b 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -248,7 +248,8 @@ public:
     }
 
     // serialize block to PBlock
-    Status serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* 
compressed_bytes, std::string* allocated_buf) const;
+    Status serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* 
compressed_bytes,
+                     bool allow_transfer_large_data = false) const;
 
     // serialize block to PRowbatch
     void serialize(RowBatch*, const RowDescriptor&);
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index cbadfc931f..0f1cf57c7f 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -50,6 +50,11 @@ Status VDataStreamSender::Channel::init(RuntimeState* state) 
{
     _finst_id.set_hi(_fragment_instance_id.hi);
     _finst_id.set_lo(_fragment_instance_id.lo);
     _brpc_request.set_allocated_finst_id(&_finst_id);
+
+    _query_id.set_hi(state->query_id().hi);
+    _query_id.set_lo(state->query_id().lo);
+    _brpc_request.set_allocated_query_id(&_query_id);
+
     _brpc_request.set_node_id(_dest_node_id);
     _brpc_request.set_sender_id(_parent->_sender_id);
     _brpc_request.set_be_number(_be_number);
@@ -67,6 +72,7 @@ Status VDataStreamSender::Channel::init(RuntimeState* state) {
     // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
     // so the empty channel not need call function close_internal()
     _need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo 
!= -1);
+    _state = state;
     return Status::OK();
 }
 
@@ -138,13 +144,27 @@ Status VDataStreamSender::Channel::send_block(PBlock* 
block, bool eos) {
     _closure->ref();
     _closure->cntl.set_timeout_ms(_brpc_timeout_ms);
 
-    if (_brpc_request.has_block()) {
-        request_block_transfer_attachment<PTransmitDataParams,
-            RefCountClosure<PTransmitDataResult>>(&_brpc_request, 
_parent->_column_values_buffer,
-                    _closure);
+    if (_parent->_transfer_large_data_by_brpc && _brpc_request.has_block() &&
+        _brpc_request.block().has_column_values() &&
+        _brpc_request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) {
+        Status st = request_embed_attachment_contain_block<PTransmitDataParams,
+                                                           
RefCountClosure<PTransmitDataResult>>(
+                &_brpc_request, _closure);
+        RETURN_IF_ERROR(st);
+        std::string brpc_url =
+                fmt::format("http://{}:{}";, _brpc_dest_addr.hostname, 
_brpc_dest_addr.port);
+        std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
+                
_state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
+                                                                               
           "http");
+        _closure->cntl.http_request().uri() =
+                brpc_url + "/PInternalServiceImpl/transmit_block_by_http";
+        _closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
+        _closure->cntl.http_request().set_content_type("application/json");
+        _brpc_http_stub->transmit_block_by_http(&_closure->cntl, NULL, 
&_closure->result, _closure);
+    } else {
+        _closure->cntl.http_request().Clear();
+        _brpc_stub->transmit_block(&_closure->cntl, &_brpc_request, 
&_closure->result, _closure);
     }
-
-    _brpc_stub->transmit_block(&_closure->cntl, &_brpc_request, 
&_closure->result, _closure);
     if (block != nullptr) {
         _brpc_request.release_block();
     }
@@ -259,7 +279,8 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int 
sender_id, const RowD
           _serialize_batch_timer(nullptr),
           _bytes_sent_counter(nullptr),
           _local_bytes_send_counter(nullptr),
-          _dest_node_id(sink.dest_node_id) {
+          _dest_node_id(sink.dest_node_id),
+          _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc) {
     DCHECK_GT(destinations.size(), 0);
     DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
            sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
@@ -563,7 +584,8 @@ Status VDataStreamSender::serialize_block(Block* src, 
PBlock* dest, int num_rece
         SCOPED_TIMER(_serialize_batch_timer);
         dest->Clear();
         size_t uncompressed_bytes = 0, compressed_bytes = 0;
-        RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, 
&compressed_bytes, &_column_values_buffer));
+        RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, 
&compressed_bytes,
+                                       _transfer_large_data_by_brpc));
         COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers);
         COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes * 
num_receivers);
     }
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 67faae452b..09c9549a5b 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -144,10 +144,8 @@ protected:
     // Identifier of the destination plan node.
     PlanNodeId _dest_node_id;
 
-    // This buffer is used to store the serialized block data
-    // The data in the buffer is copied to the attachment of the brpc when it 
is sent,
-    // to avoid an extra pb serialization in the brpc.
-    std::string _column_values_buffer;
+    // User can change this config at runtime, avoid it being modified during 
query or loading process.
+    bool _transfer_large_data_by_brpc = false;
 };
 
 // TODO: support local exechange
@@ -188,6 +186,7 @@ public:
         }
         // release this before request desctruct
         _brpc_request.release_finst_id();
+        _brpc_request.release_query_id();
     }
 
     // Initialize channel.
@@ -279,6 +278,7 @@ private:
     TNetworkAddress _brpc_dest_addr;
 
     PUniqueId _finst_id;
+    PUniqueId _query_id;
     PBlock _pb_block;
     PTransmitDataParams _brpc_request;
     std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
@@ -287,6 +287,7 @@ private:
     // whether the dest can be treated as query statistics transfer chain.
     bool _is_transfer_chain;
     bool _send_query_statistics_with_every_batch;
+    RuntimeState* _state;
 
     size_t _capacity;
     bool _is_local;
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 6de189099e..b3c948688c 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -17,6 +17,8 @@
 
 #include "vec/sink/vtablet_sink.h"
 
+#include "util/brpc_client_cache.h"
+#include "util/debug/sanitizer_scopes.h"
 #include "util/doris_metrics.h"
 #include "vec/core/block.h"
 #include "vec/exprs/vexpr.h"
diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp
index 3d8366d1d3..a6f4fef966 100644
--- a/be/test/vec/core/block_test.cpp
+++ b/be/test/vec/core/block_test.cpp
@@ -157,14 +157,13 @@ TEST(BlockTest, RowBatchCovertToBlock) {
 void block_to_pb(const vectorized::Block& block, PBlock* pblock) {
     size_t uncompressed_bytes = 0;
     size_t compressed_bytes = 0;
-    std::string column_values_buffer;
-    Status st = block.serialize(pblock, &uncompressed_bytes, 
&compressed_bytes, &column_values_buffer);
+    Status st = block.serialize(pblock, &uncompressed_bytes, 
&compressed_bytes);
     EXPECT_TRUE(st.ok());
     EXPECT_TRUE(uncompressed_bytes >= compressed_bytes);
-    EXPECT_EQ(compressed_bytes, column_values_buffer.size());
-    pblock->set_column_values(column_values_buffer);
-    
-    const vectorized::ColumnWithTypeAndName& type_and_name = 
block.get_columns_with_type_and_name()[0];
+    EXPECT_EQ(compressed_bytes, pblock->column_values().size());
+
+    const vectorized::ColumnWithTypeAndName& type_and_name =
+            block.get_columns_with_type_and_name()[0];
     EXPECT_EQ(type_and_name.name, pblock->column_metas()[0].name());
 }
 
diff --git a/docs/en/administrator-guide/config/be_config.md 
b/docs/en/administrator-guide/config/be_config.md
index b1d9f9c5eb..8afd14f61a 100644
--- a/docs/en/administrator-guide/config/be_config.md
+++ b/docs/en/administrator-guide/config/be_config.md
@@ -186,11 +186,11 @@ This configuration is mainly used to modify the parameter 
`socket_max_unwritten_
 
 Sometimes the query fails and an error message of `The server is overcrowded` 
will appear in the BE log. This means there are too many messages to buffer at 
the sender side, which may happen when the SQL needs to send large bitmap 
value. You can avoid this error by increasing the configuration.
 
-### `transfer_data_by_brpc_attachment`
+### `transfer_large_data_by_brpc`
 
 * Type: bool
-* Description: This configuration is used to control whether to transfer the 
RowBatch in the ProtoBuf Request to the Controller Attachment and then send it 
through brpc. When the length of ProtoBuf Request exceeds 2G, an error will be 
reported: Bad request, error_text=[E1003]Fail to compress request, Putting 
RowBatch in Controller Attachment will be faster and avoid this error.
-* Default value: false
+* Description: This configuration is used to control whether to serialize the 
protoBuf request and embed the Tuple/Block data into the controller attachment 
and send it through http brpc when the length of the Tuple/Block data is 
greater than 1.8G. To avoid errors when the length of the protoBuf request 
exceeds 2G: Bad request, error_text=[E1003]Fail to compress request. In the 
past version, after putting Tuple/Block data in the attachment, it was sent 
through the default baidu_std brpc, [...]
+* Default value: true
 
 ### `brpc_num_threads`
 
diff --git a/docs/zh-CN/administrator-guide/config/be_config.md 
b/docs/zh-CN/administrator-guide/config/be_config.md
index 1dd7e7a015..abbfc3b20b 100644
--- a/docs/zh-CN/administrator-guide/config/be_config.md
+++ b/docs/zh-CN/administrator-guide/config/be_config.md
@@ -179,11 +179,11 @@ Metrics: 
{"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in
 
 有时查询失败,BE 日志中会出现 `The server is overcrowded` 
的错误信息,表示连接上有过多的未发送数据。当查询需要发送较大的bitmap字段时,可能会遇到该问题,此时可能通过调大该配置避免该错误。
 
-### `transfer_data_by_brpc_attachment`
+### `transfer_large_data_by_brpc`
 
 * 类型: bool
-* 描述:该配置用来控制是否将ProtoBuf Request中的RowBatch转移到Controller 
Attachment后通过brpc发送。ProtoBuf Request的长度超过2G时会报错: Bad request, 
error_text=[E1003]Fail to compress request,将RowBatch放到Controller 
Attachment中将更快且避免这个错误。
-* 默认值:false
+* 描述:该配置用来控制是否在 Tuple/Block data 长度大于1.8G时,将 protoBuf request 序列化后和 
Tuple/Block data 一起嵌入到 controller attachment 后通过 http brpc 发送。为了避免 protoBuf 
request 的长度超过2G时的错误:Bad request, error_text=[E1003]Fail to compress 
request。在过去的版本中,曾将 Tuple/Block data 放入 attachment 后通过默认的 baidu_std brpc 发送,但 
attachment 超过2G时将被截断,通过 http brpc 发送不存在2G的限制。
+* 默认值:true
 
 ### `brpc_num_threads`
 
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 45662c10cd..9ce6194ddf 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -46,6 +46,7 @@ message PTransmitDataParams {
     optional PBlock block = 9;
     // transfer the RowBatch to the Controller Attachment
     optional bool transfer_by_attachment = 10 [default = false];
+    optional PUniqueId query_id = 11;
 };
 
 message PTransmitDataResult {
@@ -431,8 +432,11 @@ message PResetRPCChannelResponse {
     repeated string channels = 2;
 };
 
+message PEmptyRequest {};
+
 service PBackendService {
     rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
+    rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult);
     // If #fragments of a query is < 3, use exec_plan_fragment directly.
     // If #fragments of a query is >=3, use exec_plan_fragment_prepare + 
exec_plan_fragment_start
     rpc exec_plan_fragment(PExecPlanFragmentRequest) returns 
(PExecPlanFragmentResult);
@@ -442,6 +446,7 @@ service PBackendService {
     rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult);
     rpc tablet_writer_open(PTabletWriterOpenRequest) returns 
(PTabletWriterOpenResult);
     rpc tablet_writer_add_batch(PTabletWriterAddBatchRequest) returns 
(PTabletWriterAddBatchResult);
+    rpc tablet_writer_add_batch_by_http(PEmptyRequest) returns 
(PTabletWriterAddBatchResult);
     rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns 
(PTabletWriterCancelResult);
     rpc get_info(PProxyRequest) returns (PProxyResult); 
     rpc update_cache(PUpdateCacheRequest) returns (PCacheResponse);
@@ -454,6 +459,7 @@ service PBackendService {
     rpc apply_filter(PPublishFilterRequest) returns (PPublishFilterResponse);
     rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult);
     rpc transmit_block(PTransmitDataParams) returns (PTransmitDataResult);
+    rpc transmit_block_by_http(PEmptyRequest) returns (PTransmitDataResult);
     rpc check_rpc_channel(PCheckRPCChannelRequest) returns 
(PCheckRPCChannelResponse);
     rpc reset_rpc_channel(PResetRPCChannelRequest) returns 
(PResetRPCChannelResponse);
     rpc hand_shake(PHandShakeRequest) returns (PHandShakeResponse);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to