This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 1e16352526fa398dc6f3799d7256115c1313a73d Author: Xinyi Zou <[email protected]> AuthorDate: Mon Jun 13 20:41:48 2022 +0800 [fix](brpc) Embed serialized request into the attachment and transmit it through http brpc (#9803) When the length of `Tuple/Block data` is greater than 2G, serialize the protoBuf request and embed the `Tuple/Block data` into the controller attachment and transmit it through http brpc. This is to avoid errors when the length of the protoBuf request exceeds 2G: `Bad request, error_text=[E1003]Fail to compress request`. In #7164, `Tuple/Block data` was put into attachment and sent via default `baidu_std brpc`, but when the attachment exceeds 2G, it will be truncated. There is no 2G limit for sending via `http brpc`. Also, in #7921, consider putting `Tuple/Block data` into attachment transport by default, as this theoretically reduces one serialization and improves performance. However, the test found that the performance did not improve, but the memory peak increased due to the addition of a memory copy. --- be/src/common/config.h | 8 +- be/src/exec/tablet_sink.cpp | 39 ++++-- be/src/exec/tablet_sink.h | 14 +- be/src/runtime/data_stream_sender.cpp | 51 ++++--- be/src/runtime/data_stream_sender.h | 15 +- be/src/runtime/row_batch.cpp | 52 +++---- be/src/runtime/row_batch.h | 4 +- be/src/service/internal_service.cpp | 156 ++++++++++++++++++--- be/src/service/internal_service.h | 29 ++++ be/src/util/brpc_client_cache.h | 21 ++- be/src/util/proto_util.h | 110 ++++++++++++++- be/src/vec/core/block.cpp | 27 +++- be/src/vec/core/block.h | 3 +- be/src/vec/sink/vdata_stream_sender.cpp | 38 +++-- be/src/vec/sink/vdata_stream_sender.h | 9 +- be/src/vec/sink/vtablet_sink.cpp | 2 + be/test/vec/core/block_test.cpp | 11 +- docs/en/administrator-guide/config/be_config.md | 6 +- docs/zh-CN/administrator-guide/config/be_config.md | 6 +- gensrc/proto/internal_service.proto | 6 + 20 files changed, 457 insertions(+), 150 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index fad7459bef..32d19468bf 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -549,9 +549,11 @@ CONF_String(default_rowset_type, "BETA"); CONF_Int64(brpc_max_body_size, "3147483648"); // Max unwritten bytes in each socket, if the limit is reached, Socket.Write fails with EOVERCROWDED CONF_Int64(brpc_socket_max_unwritten_bytes, "1073741824"); -// Whether to transfer RowBatch in ProtoBuf Request to Controller Attachment and send it -// through brpc, this will be faster and avoid the error of Request length overflow. -CONF_mBool(transfer_data_by_brpc_attachment, "false"); +// TODO(zxy): expect to be true in v1.3 +// Whether to embed the ProtoBuf Request serialized string together with Tuple/Block data into +// Controller Attachment and send it through http brpc when the length of the Tuple/Block data +// is greater than 1.8G. This is to avoid the error of Request length overflow (2G). +CONF_mBool(transfer_large_data_by_brpc, "false"); // max number of txns for every txn_partition_map in txn manager // this is a self protection to avoid too many txns saving in manager diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index f40d1b0211..f02d8bd375 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -46,9 +46,6 @@ namespace stream_load { NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int64_t node_id) : _parent(parent), _index_channel(index_channel), _node_id(node_id) { - if (_parent->_transfer_data_by_brpc_attachment) { - _tuple_data_buffer_ptr = &_tuple_data_buffer; - } } NodeChannel::~NodeChannel() noexcept { @@ -71,6 +68,7 @@ NodeChannel::~NodeChannel() noexcept { // returned directly via "TabletSink::prepare()" method. Status NodeChannel::init(RuntimeState* state) { _tuple_desc = _parent->_output_tuple_desc; + _state = state; auto node = _parent->_nodes_info->find_node(_node_id); if (node == nullptr) { std::stringstream ss; @@ -495,7 +493,7 @@ void NodeChannel::try_send_batch() { SCOPED_ATOMIC_TIMER(&_serialize_batch_ns); size_t uncompressed_bytes = 0, compressed_bytes = 0; Status st = row_batch->serialize(request.mutable_row_batch(), &uncompressed_bytes, - &compressed_bytes, _tuple_data_buffer_ptr); + &compressed_bytes, _parent->_transfer_large_data_by_brpc); if (!st.ok()) { cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg())); _add_batch_closure->clear_in_flight(); @@ -539,14 +537,31 @@ void NodeChannel::try_send_batch() { CHECK(_pending_batches_num == 0) << _pending_batches_num; } - if (_parent->_transfer_data_by_brpc_attachment && request.has_row_batch()) { - request_row_batch_transfer_attachment<PTabletWriterAddBatchRequest, - ReusableClosure<PTabletWriterAddBatchResult>>( - &request, _tuple_data_buffer, _add_batch_closure); + if (_parent->_transfer_large_data_by_brpc && request.has_row_batch() && + request.row_batch().has_tuple_data() && request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) { + Status st = request_embed_attachment_contain_tuple< + PTabletWriterAddBatchRequest, ReusableClosure<PTabletWriterAddBatchResult>>( + &request, _add_batch_closure); + if (!st.ok()) { + cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg())); + _add_batch_closure->clear_in_flight(); + return; + } + std::string brpc_url = fmt::format("http://{}:{}", _node_info.host, _node_info.brpc_port); + std::shared_ptr<PBackendService_Stub> _brpc_http_stub = + _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, + "http"); + _add_batch_closure->cntl.http_request().uri() = + brpc_url + "/PInternalServiceImpl/tablet_writer_add_batch_by_http"; + _add_batch_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST); + _add_batch_closure->cntl.http_request().set_content_type("application/json"); + _brpc_http_stub->tablet_writer_add_batch_by_http( + &_add_batch_closure->cntl, NULL, &_add_batch_closure->result, _add_batch_closure); + } else { + _add_batch_closure->cntl.http_request().Clear(); + _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request, + &_add_batch_closure->result, _add_batch_closure); } - _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request, &_add_batch_closure->result, - _add_batch_closure); - _next_packet_seq++; } @@ -690,7 +705,7 @@ OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, *status = Expr::create_expr_trees(_pool, texprs, &_output_expr_ctxs); } _name = "OlapTableSink"; - _transfer_data_by_brpc_attachment = config::transfer_data_by_brpc_attachment; + _transfer_large_data_by_brpc = config::transfer_large_data_by_brpc; } OlapTableSink::~OlapTableSink() { diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index a6af6c15e7..1029cecb4a 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -284,15 +284,6 @@ private: std::atomic<int64_t> _queue_push_lock_ns {0}; std::atomic<int64_t> _actual_consume_ns {0}; - // buffer for saving serialized row batch data. - // In the non-attachment approach, we need to use two PRowBatch structures alternately - // so that when one PRowBatch is sent, the other PRowBatch can be used for the serialization of the next RowBatch. - // This is not necessary with the attachment approach, because the memory structures - // are already copied into attachment memory before sending, and will wait for - // the previous RPC to be fully completed before the next copy. - std::string _tuple_data_buffer; - std::string* _tuple_data_buffer_ptr = nullptr; - // the timestamp when this node channel be marked closed and finished closed uint64_t _close_time_ms = 0; @@ -306,6 +297,7 @@ private: // The IndexChannel is definitely accessible until the NodeChannel is closed. std::mutex _closed_lock; bool _is_closed = false; + RuntimeState* _state; }; class IndexChannel { @@ -492,8 +484,8 @@ protected: // Save the status of close() method Status _close_status; - // TODO(cmy): this should be removed after we switch to rpc attachment by default. - bool _transfer_data_by_brpc_attachment = false; + // User can change this config at runtime, avoid it being modified during query or loading process. + bool _transfer_large_data_by_brpc = false; // FIND_TABLET_EVERY_ROW is used for both hash and random distribution info, which indicates that we // should compute tablet index for every row diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index 93b0a128e6..ca095cef64 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -82,6 +82,7 @@ DataStreamSender::Channel::~Channel() { } // release this before request desctruct _brpc_request.release_finst_id(); + _brpc_request.release_query_id(); } Status DataStreamSender::Channel::init(RuntimeState* state) { @@ -101,6 +102,11 @@ Status DataStreamSender::Channel::init(RuntimeState* state) { _finst_id.set_hi(_fragment_instance_id.hi); _finst_id.set_lo(_fragment_instance_id.lo); _brpc_request.set_allocated_finst_id(&_finst_id); + + _query_id.set_hi(state->query_id().hi); + _query_id.set_lo(state->query_id().lo); + _brpc_request.set_allocated_query_id(&_query_id); + _brpc_request.set_node_id(_dest_node_id); _brpc_request.set_sender_id(_parent->_sender_id); _brpc_request.set_be_number(_be_number); @@ -120,6 +126,7 @@ Status DataStreamSender::Channel::init(RuntimeState* state) { return Status::InternalError(msg); } } + _state = state; return Status::OK(); } @@ -147,12 +154,28 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) { _closure->ref(); _closure->cntl.set_timeout_ms(_brpc_timeout_ms); - if (_parent->_transfer_data_by_brpc_attachment && _brpc_request.has_row_batch()) { - request_row_batch_transfer_attachment<PTransmitDataParams, - RefCountClosure<PTransmitDataResult>>(&_brpc_request, _parent->_tuple_data_buffer, - _closure); + if (_parent->_transfer_large_data_by_brpc && _brpc_request.has_row_batch() && + _brpc_request.row_batch().has_tuple_data() && + _brpc_request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) { + Status st = request_embed_attachment_contain_tuple<PTransmitDataParams, + RefCountClosure<PTransmitDataResult>>( + &_brpc_request, _closure); + RETURN_IF_ERROR(st); + std::string brpc_url = + fmt::format("http://{}:{}", _brpc_dest_addr.hostname, _brpc_dest_addr.port); + std::shared_ptr<PBackendService_Stub> _brpc_http_stub = + _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, + "http"); + _closure->cntl.http_request().uri() = + brpc_url + "/PInternalServiceImpl/transmit_data_by_http"; + _closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST); + _closure->cntl.http_request().set_content_type("application/json"); + _brpc_http_stub->transmit_data_by_http(&_closure->cntl, NULL, &_closure->result, _closure); + } else { + _closure->cntl.http_request().Clear(); + _brpc_stub->transmit_data(&_closure->cntl, &_brpc_request, &_closure->result, _closure); } - _brpc_stub->transmit_data(&_closure->cntl, &_brpc_request, &_closure->result, _closure); + if (batch != nullptr) { _brpc_request.release_row_batch(); } @@ -272,13 +295,7 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDes _sender_id(sender_id), _serialize_batch_timer(nullptr), _bytes_sent_counter(nullptr), - _local_bytes_send_counter(nullptr), - _transfer_data_by_brpc_attachment(config::transfer_data_by_brpc_attachment) { - - if (_transfer_data_by_brpc_attachment) { - _tuple_data_buffer_ptr = &_tuple_data_buffer; - } -} + _local_bytes_send_counter(nullptr) {} DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, const TDataStreamSink& sink, @@ -297,12 +314,7 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDes _part_type(sink.output_partition.type), _ignore_not_found(sink.__isset.ignore_not_found ? sink.ignore_not_found : true), _dest_node_id(sink.dest_node_id), - _transfer_data_by_brpc_attachment(config::transfer_data_by_brpc_attachment) { - - if (_transfer_data_by_brpc_attachment) { - _tuple_data_buffer_ptr = &_tuple_data_buffer; - } - + _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc) { DCHECK_GT(destinations.size(), 0); DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED || sink.output_partition.type == TPartitionType::HASH_PARTITIONED || @@ -668,7 +680,8 @@ Status DataStreamSender::serialize_batch(RowBatch* src, PRowBatch* dest, int num { SCOPED_TIMER(_serialize_batch_timer); size_t uncompressed_bytes = 0, compressed_bytes = 0; - RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, &compressed_bytes, _tuple_data_buffer_ptr)); + RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, &compressed_bytes, + _transfer_large_data_by_brpc)); COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers); COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes * num_receivers); } diff --git a/be/src/runtime/data_stream_sender.h b/be/src/runtime/data_stream_sender.h index 1da9d32026..377d13f1fa 100644 --- a/be/src/runtime/data_stream_sender.h +++ b/be/src/runtime/data_stream_sender.h @@ -197,8 +197,9 @@ protected: TNetworkAddress _brpc_dest_addr; - // TODO(zc): initused for brpc + // TODO(zc): init used for brpc PUniqueId _finst_id; + PUniqueId _query_id; // serialized batches for broadcasting; we need two so we can write // one while the other one is still being sent. @@ -211,6 +212,7 @@ protected: PTransmitDataParams _brpc_request; std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr; RefCountClosure<PTransmitDataResult>* _closure = nullptr; + RuntimeState* _state; int32_t _brpc_timeout_ms = 500; // whether the dest can be treated as query statistics transfer chain. bool _is_transfer_chain; @@ -257,14 +259,6 @@ private: PRowBatch _pb_batch1; PRowBatch _pb_batch2; - // This buffer is used to store the serialized rowbatch data. - // Only works when `config::transfer_data_by_brpc_attachment` is true. - // The data in the buffer is copied to the attachment of the brpc when it is sent, - // to avoid an extra pb serialization in the brpc. - // _tuple_data_buffer_ptr will point to _tuple_data_buffer if `config::transfer_data_by_brpc_attachment` is true. - std::string _tuple_data_buffer; - std::string* _tuple_data_buffer_ptr = nullptr; - std::vector<ExprContext*> _partition_expr_ctxs; // compute per-row partition values // map from range value to partition_id @@ -279,7 +273,8 @@ private: // Identifier of the destination plan node. PlanNodeId _dest_node_id; - bool _transfer_data_by_brpc_attachment = false; + // User can change this config at runtime, avoid it being modified during query or loading process. + bool _transfer_large_data_by_brpc = false; }; } // namespace doris diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index cdbbebbfed..02d70b16e1 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -237,7 +237,7 @@ RowBatch::~RowBatch() { } Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, - size_t* compressed_size, std::string* allocated_buf) { + size_t* compressed_size, bool allow_transfer_large_data) { // num_rows output_batch->set_num_rows(_num_rows); // row_tuples @@ -252,19 +252,9 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, // is_compressed output_batch->set_is_compressed(false); // tuple data - size_t size = total_byte_size(); - std::string* mutable_tuple_data = nullptr; - if (allocated_buf != nullptr) { - allocated_buf->resize(size); - // all tuple data will be written in the allocated_buf - // instead of tuple_data in PRowBatch - mutable_tuple_data = allocated_buf; - // tuple_data is a required field - output_batch->set_tuple_data(""); - } else { - mutable_tuple_data = output_batch->mutable_tuple_data(); - mutable_tuple_data->resize(size); - } + size_t tuple_byte_size = total_byte_size(); + std::string* mutable_tuple_data = output_batch->mutable_tuple_data(); + mutable_tuple_data->resize(tuple_byte_size); // Copy tuple data, including strings, into output_batch (converting string // pointers into offsets in the process) @@ -288,15 +278,15 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, mutable_tuple_offsets->Add((int32_t) offset); mutable_new_tuple_offsets->Add(offset); row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true); - CHECK_LE(offset, size); + CHECK_LE(offset, tuple_byte_size); } } - CHECK_EQ(offset, size) << "offset: " << offset << " vs. size: " << size; + CHECK_EQ(offset, tuple_byte_size) << "offset: " << offset << " vs. size: " << tuple_byte_size; - if (config::compress_rowbatches && size > 0) { + if (config::compress_rowbatches && tuple_byte_size > 0) { // Try compressing tuple_data to _compression_scratch, swap if compressed data is // smaller - uint32_t max_compressed_size = snappy::MaxCompressedLength(size); + uint32_t max_compressed_size = snappy::MaxCompressedLength(tuple_byte_size); if (_compression_scratch.size() < max_compressed_size) { _compression_scratch.resize(max_compressed_size); @@ -304,33 +294,25 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, size_t compressed_size = 0; char* compressed_output = _compression_scratch.data(); - snappy::RawCompress(mutable_tuple_data->data(), size, compressed_output, &compressed_size); + snappy::RawCompress(mutable_tuple_data->data(), tuple_byte_size, compressed_output, &compressed_size); - if (LIKELY(compressed_size < size)) { + if (LIKELY(compressed_size < tuple_byte_size)) { _compression_scratch.resize(compressed_size); mutable_tuple_data->swap(_compression_scratch); output_batch->set_is_compressed(true); } - VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << compressed_size; + VLOG_ROW << "uncompressed size: " << tuple_byte_size << ", compressed size: " << compressed_size; } // return compressed and uncompressed size size_t pb_size = get_batch_size(*output_batch); - if (allocated_buf == nullptr) { - *uncompressed_size = pb_size - mutable_tuple_data->size() + size; - *compressed_size = pb_size; - if (pb_size > std::numeric_limits<int32_t>::max()) { - // the protobuf has a hard limit of 2GB for serialized data. - return Status::InternalError( - fmt::format("The rowbatch is large than 2GB({}), can not send by Protobuf. " - "please set BE config 'transfer_data_by_brpc_attachment' to true " - "and restart BE.", - pb_size)); - } - } else { - *uncompressed_size = pb_size + size; - *compressed_size = pb_size + mutable_tuple_data->size(); + *uncompressed_size = pb_size - mutable_tuple_data->size() + tuple_byte_size; + *compressed_size = pb_size; + if (!allow_transfer_large_data && pb_size > std::numeric_limits<int32_t>::max()) { + // the protobuf has a hard limit of 2GB for serialized data. + return Status::InternalError(fmt::format( + "The rowbatch is large than 2GB({}), can not send by Protobuf.", pb_size)); } return Status::OK(); } diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h index 070a1e578f..5aad1a0e62 100644 --- a/be/src/runtime/row_batch.h +++ b/be/src/runtime/row_batch.h @@ -351,10 +351,8 @@ public: // This function does not reset(). // Returns the uncompressed serialized size (this will be the true size of output_batch // if tuple_data is actually uncompressed). - // if allocated_buf is not null, the serialized tuple data will be saved in this buf - // instead of `tuple_data` in PRowBatch. Status serialize(PRowBatch* output_batch, size_t* uncompressed_size, size_t* compressed_size, - std::string* allocated_buf = nullptr); + bool allow_transfer_large_data = false); // Utility function: returns total size of batch. static size_t get_batch_size(const PRowBatch& batch); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index c1185591e6..c4c48a3c8b 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -42,6 +42,28 @@ namespace doris { DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(add_batch_task_queue_size, MetricUnit::NOUNIT); +template <typename T> +class NewHttpClosure : public ::google::protobuf::Closure { +public: + NewHttpClosure(T* request, google::protobuf::Closure* done) : _request(request), _done(done) {} + ~NewHttpClosure() {} + + void Run() { + if (_request != nullptr) { + delete _request; + _request = nullptr; + } + if (_done != nullptr) { + _done->Run(); + } + delete this; + } + +private: + T* _request = nullptr; + google::protobuf::Closure* _done = nullptr; +}; + template <typename T> PInternalServiceImpl<T>::PInternalServiceImpl(ExecEnv* exec_env) : _exec_env(exec_env), _tablet_worker_pool(config::number_tablet_writer_threads, 10240) { @@ -63,15 +85,51 @@ void PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cnt << " node=" << request->node_id(); brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); attachment_transfer_request_row_batch<PTransmitDataParams>(request, cntl); + + _transmit_data(cntl_base, request, response, done, Status::OK()); +} + +template <typename T> +void PInternalServiceImpl<T>::transmit_data_by_http(google::protobuf::RpcController* cntl_base, + const PEmptyRequest* request, + PTransmitDataResult* response, + google::protobuf::Closure* done) { + PTransmitDataParams* request_raw = new PTransmitDataParams(); + google::protobuf::Closure* done_raw = + new NewHttpClosure<PTransmitDataParams>(request_raw, done); + brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); + Status st = attachment_extract_request_contain_tuple<PTransmitDataParams>(request_raw, cntl); + _transmit_data(cntl_base, request_raw, response, done_raw, st); +} + +template <typename T> +void PInternalServiceImpl<T>::_transmit_data(google::protobuf::RpcController* cntl_base, + const PTransmitDataParams* request, + PTransmitDataResult* response, + google::protobuf::Closure* done, + const Status& extract_st) { + std::string query_id; + if (request->has_query_id()) { + query_id = print_id(request->query_id()); + TUniqueId finst_id; + finst_id.__set_hi(request->finst_id().hi()); + finst_id.__set_lo(request->finst_id().lo()); + } + VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id()) + << " query_id=" << query_id << " node=" << request->node_id(); // The response is accessed when done->Run is called in transmit_data(), // give response a default value to avoid null pointers in high concurrency. Status st; st.to_protobuf(response->mutable_status()); - st = _exec_env->stream_mgr()->transmit_data(request, &done); - if (!st.ok()) { - LOG(WARNING) << "transmit_data failed, message=" << st.get_error_msg() - << ", fragment_instance_id=" << print_id(request->finst_id()) - << ", node=" << request->node_id(); + if (extract_st.ok()) { + st = _exec_env->stream_mgr()->transmit_data(request, &done); + if (!st.ok()) { + LOG(WARNING) << "transmit_data failed, message=" << st.get_error_msg() + << ", fragment_instance_id=" << print_id(request->finst_id()) + << ", node=" << request->node_id(); + } + } else { + st = extract_st; } if (done != nullptr) { st.to_protobuf(response->mutable_status()); @@ -133,9 +191,34 @@ void PInternalServiceImpl<T>::exec_plan_fragment_start(google::protobuf::RpcCont template <typename T> void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcController* cntl_base, - const PTabletWriterAddBatchRequest* request, - PTabletWriterAddBatchResult* response, - google::protobuf::Closure* done) { + const PTabletWriterAddBatchRequest* request, + PTabletWriterAddBatchResult* response, + google::protobuf::Closure* done) { + _tablet_writer_add_batch(cntl_base, request, response, done); +} + +template <typename T> +void PInternalServiceImpl<T>::tablet_writer_add_batch_by_http( + google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request, + PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) { + PTabletWriterAddBatchRequest* request_raw = new PTabletWriterAddBatchRequest(); + google::protobuf::Closure* done_raw = + new NewHttpClosure<PTabletWriterAddBatchRequest>(request_raw, done); + brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); + Status st = attachment_extract_request_contain_tuple<PTabletWriterAddBatchRequest>(request_raw, + cntl); + if (st.ok()) { + _tablet_writer_add_batch(cntl_base, request_raw, response, done_raw); + } else { + st.to_protobuf(response->mutable_status()); + } +} + +template <typename T> +void PInternalServiceImpl<T>::_tablet_writer_add_batch(google::protobuf::RpcController* cntl_base, + const PTabletWriterAddBatchRequest* request, + PTabletWriterAddBatchResult* response, + google::protobuf::Closure* done) { VLOG_RPC << "tablet writer add batch, id=" << request->id() << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id() << ", current_queued_size=" << _tablet_worker_pool.get_queue_size(); @@ -149,8 +232,10 @@ void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); + // TODO(zxy) delete in 1.2 version brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); attachment_transfer_request_row_batch<PTabletWriterAddBatchRequest>(request, cntl); + auto st = _exec_env->load_channel_mgr()->add_batch(*request, response); if (!st.ok()) { LOG(WARNING) << "tablet writer add batch failed, message=" << st.get_error_msg() @@ -460,22 +545,57 @@ Status PInternalServiceImpl<T>::_fold_constant_expr(const std::string& ser_reque template <typename T> void PInternalServiceImpl<T>::transmit_block(google::protobuf::RpcController* cntl_base, - const PTransmitDataParams* request, - PTransmitDataResult* response, - google::protobuf::Closure* done) { - VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id()) - << " node=" << request->node_id(); + const PTransmitDataParams* request, + PTransmitDataResult* response, + google::protobuf::Closure* done) { + // TODO(zxy) delete in 1.2 version brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); attachment_transfer_request_block<PTransmitDataParams>(request, cntl); + + _transmit_block(cntl_base, request, response, done, Status::OK()); +} + +template <typename T> +void PInternalServiceImpl<T>::transmit_block_by_http(google::protobuf::RpcController* cntl_base, + const PEmptyRequest* request, + PTransmitDataResult* response, + google::protobuf::Closure* done) { + PTransmitDataParams* request_raw = new PTransmitDataParams(); + google::protobuf::Closure* done_raw = + new NewHttpClosure<PTransmitDataParams>(request_raw, done); + brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); + Status st = attachment_extract_request_contain_block<PTransmitDataParams>(request_raw, cntl); + _transmit_block(cntl_base, request_raw, response, done_raw, st); +} + +template <typename T> +void PInternalServiceImpl<T>::_transmit_block(google::protobuf::RpcController* cntl_base, + const PTransmitDataParams* request, + PTransmitDataResult* response, + google::protobuf::Closure* done, + const Status& extract_st) { + std::string query_id; + if (request->has_query_id()) { + query_id = print_id(request->query_id()); + TUniqueId finst_id; + finst_id.__set_hi(request->finst_id().hi()); + finst_id.__set_lo(request->finst_id().lo()); + } + VLOG_ROW << "transmit block: fragment_instance_id=" << print_id(request->finst_id()) + << " query_id=" << query_id << " node=" << request->node_id(); // The response is accessed when done->Run is called in transmit_block(), // give response a default value to avoid null pointers in high concurrency. Status st; st.to_protobuf(response->mutable_status()); - st = _exec_env->vstream_mgr()->transmit_block(request, &done); - if (!st.ok()) { - LOG(WARNING) << "transmit_block failed, message=" << st.get_error_msg() - << ", fragment_instance_id=" << print_id(request->finst_id()) - << ", node=" << request->node_id(); + if (extract_st.ok()) { + st = _exec_env->vstream_mgr()->transmit_block(request, &done); + if (!st.ok()) { + LOG(WARNING) << "transmit_block failed, message=" << st.get_error_msg() + << ", fragment_instance_id=" << print_id(request->finst_id()) + << ", node=" << request->node_id(); + } + } else { + st = extract_st; } if (done != nullptr) { st.to_protobuf(response->mutable_status()); diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 550773b950..0ca63aa5f9 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -41,6 +41,11 @@ public: ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done) override; + void transmit_data_by_http(::google::protobuf::RpcController* controller, + const ::doris::PEmptyRequest* request, + ::doris::PTransmitDataResult* response, + ::google::protobuf::Closure* done) override; + void exec_plan_fragment(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, PExecPlanFragmentResult* result, @@ -74,6 +79,11 @@ public: PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) override; + void tablet_writer_add_batch_by_http(google::protobuf::RpcController* controller, + const ::doris::PEmptyRequest* request, + PTabletWriterAddBatchResult* response, + google::protobuf::Closure* done) override; + void tablet_writer_cancel(google::protobuf::RpcController* controller, const PTabletWriterCancelRequest* request, PTabletWriterCancelResult* response, @@ -105,6 +115,10 @@ public: const ::doris::PTransmitDataParams* request, ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done) override; + void transmit_block_by_http(::google::protobuf::RpcController* controller, + const ::doris::PEmptyRequest* request, + ::doris::PTransmitDataResult* response, + ::google::protobuf::Closure* done) override; void send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request, PSendDataResult* response, google::protobuf::Closure* done) override; @@ -132,6 +146,21 @@ private: Status _fold_constant_expr(const std::string& ser_request, PConstantExprResult* response); + void _transmit_data(::google::protobuf::RpcController* controller, + const ::doris::PTransmitDataParams* request, + ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done, + const Status& extract_st); + + void _transmit_block(::google::protobuf::RpcController* controller, + const ::doris::PTransmitDataParams* request, + ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done, + const Status& extract_st); + + void _tablet_writer_add_batch(google::protobuf::RpcController* controller, + const PTabletWriterAddBatchRequest* request, + PTabletWriterAddBatchResult* response, + google::protobuf::Closure* done); + private: ExecEnv* _exec_env; PriorityThreadPool _tablet_worker_pool; diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h index b15a3c6aeb..aef96b6ec6 100644 --- a/be/src/util/brpc_client_cache.h +++ b/be/src/util/brpc_client_cache.h @@ -71,9 +71,23 @@ public: } // new one stub and insert into map + auto stub = get_new_client_no_cache(host_port); + _stub_map.try_emplace_l( + host_port, [&stub](typename StubMap<T>::mapped_type& v) { stub = v; }, stub); + return stub; + } + + std::shared_ptr<T> get_new_client_no_cache(const std::string& host_port, + const std::string& protocol = "baidu_std", + const std::string& connect_type = "") { brpc::ChannelOptions options; if constexpr (std::is_same_v<T, PFunctionService_Stub>) { options.protocol = config::function_service_protocol; + } else { + options.protocol = protocol; + } + if (connect_type != "") { + options.connection_type = connect_type; } std::unique_ptr<brpc::Channel> channel(new brpc::Channel()); int ret_code = 0; @@ -86,12 +100,7 @@ public: if (ret_code) { return nullptr; } - auto stub = std::make_shared<T>(channel.release(), - google::protobuf::Service::STUB_OWNS_CHANNEL); - _stub_map.try_emplace_l(host_port, - [&stub](typename StubMap<T>::mapped_type& v) { stub = v; }, - stub); - return stub; + return std::make_shared<T>(channel.release(), google::protobuf::Service::STUB_OWNS_CHANNEL); } inline size_t size() { return _stub_map.size(); } diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h index 52dcec1861..0144d87396 100644 --- a/be/src/util/proto_util.h +++ b/be/src/util/proto_util.h @@ -17,10 +17,16 @@ #pragma once -#include "util/stack_util.h" - namespace doris { +// When the tuple/block data is greater than 2G, embed the tuple/block data +// and the request serialization string in the attachment, and use "http" brpc. +// "http"brpc requires that only one of request and attachment be non-null. +// +// 2G: In the default "baidu_std" brpcd, upper limit of the request and attachment length is 2G. +constexpr size_t MIN_HTTP_BRPC_SIZE = (1ULL << 31); + +// TODO(zxy) delete in v1.3 version // Transfer RowBatch in ProtoBuf Request to Controller Attachment. // This can avoid reaching the upper limit of the ProtoBuf Request length (2G), // and it is expected that performance can be improved. @@ -34,6 +40,7 @@ inline void request_row_batch_transfer_attachment(Params* brpc_request, const st closure->cntl.request_attachment().swap(attachment); } +// TODO(zxy) delete in v1.3 version // Transfer Block in ProtoBuf Request to Controller Attachment. // This can avoid reaching the upper limit of the ProtoBuf Request length (2G), // and it is expected that performance can be improved. @@ -47,6 +54,7 @@ inline void request_block_transfer_attachment(Params* brpc_request, const std::s closure->cntl.request_attachment().swap(attachment); } +// TODO(zxy) delete in v1.3 version // Controller Attachment transferred to RowBatch in ProtoBuf Request. template <typename Params> inline void attachment_transfer_request_row_batch(const Params* brpc_request, brpc::Controller* cntl) { @@ -59,6 +67,7 @@ inline void attachment_transfer_request_row_batch(const Params* brpc_request, br } } +// TODO(zxy) delete in v1.3 version // Controller Attachment transferred to Block in ProtoBuf Request. template <typename Params> inline void attachment_transfer_request_block(const Params* brpc_request, brpc::Controller* cntl) { @@ -71,4 +80,101 @@ inline void attachment_transfer_request_block(const Params* brpc_request, brpc:: } } +// Embed tuple_data and brpc request serialization string in controller attachment. +template <typename Params, typename Closure> +inline Status request_embed_attachment_contain_tuple(Params* brpc_request, Closure* closure) { + auto row_batch = brpc_request->row_batch(); + Status st = request_embed_attachment(brpc_request, row_batch.tuple_data(), closure); + row_batch.set_tuple_data(""); + return st; +} + +// Embed column_values and brpc request serialization string in controller attachment. +template <typename Params, typename Closure> +inline Status request_embed_attachment_contain_block(Params* brpc_request, Closure* closure) { + auto block = brpc_request->block(); + Status st = request_embed_attachment(brpc_request, block.column_values(), closure); + block.set_column_values(""); + return st; +} + +template <typename Params, typename Closure> +inline Status request_embed_attachment(Params* brpc_request, const std::string& data, + Closure* closure) { + butil::IOBuf attachment; + + // step1: serialize brpc_request to string, and append to attachment. + std::string req_str; + brpc_request->SerializeToString(&req_str); + int64_t req_str_size = req_str.size(); + attachment.append(&req_str_size, sizeof(req_str_size)); + attachment.append(req_str); + + // step2: append data to attachment and put it in the closure. + int64_t data_size = data.size(); + attachment.append(&data_size, sizeof(data_size)); + try { + attachment.append(data); + } catch (...) { + std::exception_ptr p = std::current_exception(); + LOG(WARNING) << "Try to alloc " << data_size + << " bytes for append data to attachment failed. " + << (p ? p.__cxa_exception_type()->name() : "null"); + return Status::MemoryAllocFailed( + fmt::format("request embed attachment failed to memcpy {} bytes", data_size)); + } + // step3: attachment add to closure. + closure->cntl.request_attachment().swap(attachment); + return Status::OK(); +} + +// Extract the brpc request and tuple data from the controller attachment, +// and put the tuple data into the request. +template <typename Params> +inline Status attachment_extract_request_contain_tuple(const Params* brpc_request, + brpc::Controller* cntl) { + Params* req = const_cast<Params*>(brpc_request); + auto rb = req->mutable_row_batch(); + return attachment_extract_request(req, cntl, rb->mutable_tuple_data()); +} + +// Extract the brpc request and block from the controller attachment, +// and put the block into the request. +template <typename Params> +inline Status attachment_extract_request_contain_block(const Params* brpc_request, + brpc::Controller* cntl) { + Params* req = const_cast<Params*>(brpc_request); + auto block = req->mutable_block(); + return attachment_extract_request(req, cntl, block->mutable_column_values()); +} + +template <typename Params> +inline Status attachment_extract_request(const Params* brpc_request, brpc::Controller* cntl, + std::string* data) { + const butil::IOBuf& io_buf = cntl->request_attachment(); + + // step1: deserialize request string to brpc_request from attachment. + int64_t req_str_size; + io_buf.copy_to(&req_str_size, sizeof(req_str_size), 0); + std::string req_str; + io_buf.copy_to(&req_str, req_str_size, sizeof(req_str_size)); + Params* req = const_cast<Params*>(brpc_request); + req->ParseFromString(req_str); + + // step2: extract data from attachment. + int64_t data_size; + io_buf.copy_to(&data_size, sizeof(data_size), sizeof(req_str_size) + req_str_size); + try { + io_buf.copy_to(data, data_size, sizeof(data_size) + sizeof(req_str_size) + req_str_size); + } catch (...) { + std::exception_ptr p = std::current_exception(); + LOG(WARNING) << "Try to alloc " << data_size + << " bytes for extract data from attachment failed. " + << (p ? p.__cxa_exception_type()->name() : "null"); + return Status::MemoryAllocFailed( + fmt::format("attachment extract request failed to memcpy {} bytes", data_size)); + } + return Status::OK(); +} + } // namespace doris diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 0d275f6604..bcc06b51b1 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -721,7 +721,7 @@ Status Block::filter_block(Block* block, int filter_column_id, int column_to_kee } Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes, - std::string* allocated_buf) const { + bool allow_transfer_large_data) const { // calc uncompressed size for allocation size_t content_uncompressed_size = 0; for (const auto& c : *this) { @@ -733,8 +733,20 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp // serialize data values // when data type is HLL, content_uncompressed_size maybe larger than real size. - allocated_buf->resize(content_uncompressed_size); - char* buf = allocated_buf->data(); + std::string* column_values = nullptr; + try { + column_values = pblock->mutable_column_values(); + column_values->resize(content_uncompressed_size); + } catch (...) { + std::exception_ptr p = std::current_exception(); + std::string msg = fmt::format( + "Try to alloc {} bytes for pblock column values failed. reason {}", + content_uncompressed_size, p ? p.__cxa_exception_type()->name() : "null"); + LOG(WARNING) << msg; + return Status::BufferAllocFailed(msg); + } + char* buf = column_values->data(); + for (const auto& c : *this) { buf = c.type->serialize(*(c.column), buf); } @@ -750,12 +762,12 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp size_t compressed_size = 0; char* compressed_output = compression_scratch.data(); - snappy::RawCompress(allocated_buf->data(), content_uncompressed_size, compressed_output, + snappy::RawCompress(column_values->data(), content_uncompressed_size, compressed_output, &compressed_size); if (LIKELY(compressed_size < content_uncompressed_size)) { compression_scratch.resize(compressed_size); - allocated_buf->swap(compression_scratch); + column_values->swap(compression_scratch); pblock->set_compressed(true); *compressed_bytes = compressed_size; } else { @@ -765,7 +777,10 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp VLOG_ROW << "uncompressed size: " << content_uncompressed_size << ", compressed size: " << compressed_size; } - + if (!allow_transfer_large_data && *compressed_bytes >= std::numeric_limits<int32_t>::max()) { + return Status::InternalError(fmt::format( + "The block is large than 2GB({}), can not send by Protobuf.", *compressed_bytes)); + } return Status::OK(); } diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 9bc86c4533..31b3602a8b 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -248,7 +248,8 @@ public: } // serialize block to PBlock - Status serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes, std::string* allocated_buf) const; + Status serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes, + bool allow_transfer_large_data = false) const; // serialize block to PRowbatch void serialize(RowBatch*, const RowDescriptor&); diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index cbadfc931f..0f1cf57c7f 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -50,6 +50,11 @@ Status VDataStreamSender::Channel::init(RuntimeState* state) { _finst_id.set_hi(_fragment_instance_id.hi); _finst_id.set_lo(_fragment_instance_id.lo); _brpc_request.set_allocated_finst_id(&_finst_id); + + _query_id.set_hi(state->query_id().hi); + _query_id.set_lo(state->query_id().lo); + _brpc_request.set_allocated_query_id(&_query_id); + _brpc_request.set_node_id(_dest_node_id); _brpc_request.set_sender_id(_parent->_sender_id); _brpc_request.set_be_number(_be_number); @@ -67,6 +72,7 @@ Status VDataStreamSender::Channel::init(RuntimeState* state) { // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0" // so the empty channel not need call function close_internal() _need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo != -1); + _state = state; return Status::OK(); } @@ -138,13 +144,27 @@ Status VDataStreamSender::Channel::send_block(PBlock* block, bool eos) { _closure->ref(); _closure->cntl.set_timeout_ms(_brpc_timeout_ms); - if (_brpc_request.has_block()) { - request_block_transfer_attachment<PTransmitDataParams, - RefCountClosure<PTransmitDataResult>>(&_brpc_request, _parent->_column_values_buffer, - _closure); + if (_parent->_transfer_large_data_by_brpc && _brpc_request.has_block() && + _brpc_request.block().has_column_values() && + _brpc_request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) { + Status st = request_embed_attachment_contain_block<PTransmitDataParams, + RefCountClosure<PTransmitDataResult>>( + &_brpc_request, _closure); + RETURN_IF_ERROR(st); + std::string brpc_url = + fmt::format("http://{}:{}", _brpc_dest_addr.hostname, _brpc_dest_addr.port); + std::shared_ptr<PBackendService_Stub> _brpc_http_stub = + _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, + "http"); + _closure->cntl.http_request().uri() = + brpc_url + "/PInternalServiceImpl/transmit_block_by_http"; + _closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST); + _closure->cntl.http_request().set_content_type("application/json"); + _brpc_http_stub->transmit_block_by_http(&_closure->cntl, NULL, &_closure->result, _closure); + } else { + _closure->cntl.http_request().Clear(); + _brpc_stub->transmit_block(&_closure->cntl, &_brpc_request, &_closure->result, _closure); } - - _brpc_stub->transmit_block(&_closure->cntl, &_brpc_request, &_closure->result, _closure); if (block != nullptr) { _brpc_request.release_block(); } @@ -259,7 +279,8 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD _serialize_batch_timer(nullptr), _bytes_sent_counter(nullptr), _local_bytes_send_counter(nullptr), - _dest_node_id(sink.dest_node_id) { + _dest_node_id(sink.dest_node_id), + _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc) { DCHECK_GT(destinations.size(), 0); DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED || sink.output_partition.type == TPartitionType::HASH_PARTITIONED || @@ -563,7 +584,8 @@ Status VDataStreamSender::serialize_block(Block* src, PBlock* dest, int num_rece SCOPED_TIMER(_serialize_batch_timer); dest->Clear(); size_t uncompressed_bytes = 0, compressed_bytes = 0; - RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, &compressed_bytes, &_column_values_buffer)); + RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, &compressed_bytes, + _transfer_large_data_by_brpc)); COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers); COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes * num_receivers); } diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 67faae452b..09c9549a5b 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -144,10 +144,8 @@ protected: // Identifier of the destination plan node. PlanNodeId _dest_node_id; - // This buffer is used to store the serialized block data - // The data in the buffer is copied to the attachment of the brpc when it is sent, - // to avoid an extra pb serialization in the brpc. - std::string _column_values_buffer; + // User can change this config at runtime, avoid it being modified during query or loading process. + bool _transfer_large_data_by_brpc = false; }; // TODO: support local exechange @@ -188,6 +186,7 @@ public: } // release this before request desctruct _brpc_request.release_finst_id(); + _brpc_request.release_query_id(); } // Initialize channel. @@ -279,6 +278,7 @@ private: TNetworkAddress _brpc_dest_addr; PUniqueId _finst_id; + PUniqueId _query_id; PBlock _pb_block; PTransmitDataParams _brpc_request; std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr; @@ -287,6 +287,7 @@ private: // whether the dest can be treated as query statistics transfer chain. bool _is_transfer_chain; bool _send_query_statistics_with_every_batch; + RuntimeState* _state; size_t _capacity; bool _is_local; diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 6de189099e..b3c948688c 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -17,6 +17,8 @@ #include "vec/sink/vtablet_sink.h" +#include "util/brpc_client_cache.h" +#include "util/debug/sanitizer_scopes.h" #include "util/doris_metrics.h" #include "vec/core/block.h" #include "vec/exprs/vexpr.h" diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp index 3d8366d1d3..a6f4fef966 100644 --- a/be/test/vec/core/block_test.cpp +++ b/be/test/vec/core/block_test.cpp @@ -157,14 +157,13 @@ TEST(BlockTest, RowBatchCovertToBlock) { void block_to_pb(const vectorized::Block& block, PBlock* pblock) { size_t uncompressed_bytes = 0; size_t compressed_bytes = 0; - std::string column_values_buffer; - Status st = block.serialize(pblock, &uncompressed_bytes, &compressed_bytes, &column_values_buffer); + Status st = block.serialize(pblock, &uncompressed_bytes, &compressed_bytes); EXPECT_TRUE(st.ok()); EXPECT_TRUE(uncompressed_bytes >= compressed_bytes); - EXPECT_EQ(compressed_bytes, column_values_buffer.size()); - pblock->set_column_values(column_values_buffer); - - const vectorized::ColumnWithTypeAndName& type_and_name = block.get_columns_with_type_and_name()[0]; + EXPECT_EQ(compressed_bytes, pblock->column_values().size()); + + const vectorized::ColumnWithTypeAndName& type_and_name = + block.get_columns_with_type_and_name()[0]; EXPECT_EQ(type_and_name.name, pblock->column_metas()[0].name()); } diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md index b1d9f9c5eb..8afd14f61a 100644 --- a/docs/en/administrator-guide/config/be_config.md +++ b/docs/en/administrator-guide/config/be_config.md @@ -186,11 +186,11 @@ This configuration is mainly used to modify the parameter `socket_max_unwritten_ Sometimes the query fails and an error message of `The server is overcrowded` will appear in the BE log. This means there are too many messages to buffer at the sender side, which may happen when the SQL needs to send large bitmap value. You can avoid this error by increasing the configuration. -### `transfer_data_by_brpc_attachment` +### `transfer_large_data_by_brpc` * Type: bool -* Description: This configuration is used to control whether to transfer the RowBatch in the ProtoBuf Request to the Controller Attachment and then send it through brpc. When the length of ProtoBuf Request exceeds 2G, an error will be reported: Bad request, error_text=[E1003]Fail to compress request, Putting RowBatch in Controller Attachment will be faster and avoid this error. -* Default value: false +* Description: This configuration is used to control whether to serialize the protoBuf request and embed the Tuple/Block data into the controller attachment and send it through http brpc when the length of the Tuple/Block data is greater than 1.8G. To avoid errors when the length of the protoBuf request exceeds 2G: Bad request, error_text=[E1003]Fail to compress request. In the past version, after putting Tuple/Block data in the attachment, it was sent through the default baidu_std brpc, [...] +* Default value: true ### `brpc_num_threads` diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md index 1dd7e7a015..abbfc3b20b 100644 --- a/docs/zh-CN/administrator-guide/config/be_config.md +++ b/docs/zh-CN/administrator-guide/config/be_config.md @@ -179,11 +179,11 @@ Metrics: {"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in 有时查询失败,BE 日志中会出现 `The server is overcrowded` 的错误信息,表示连接上有过多的未发送数据。当查询需要发送较大的bitmap字段时,可能会遇到该问题,此时可能通过调大该配置避免该错误。 -### `transfer_data_by_brpc_attachment` +### `transfer_large_data_by_brpc` * 类型: bool -* 描述:该配置用来控制是否将ProtoBuf Request中的RowBatch转移到Controller Attachment后通过brpc发送。ProtoBuf Request的长度超过2G时会报错: Bad request, error_text=[E1003]Fail to compress request,将RowBatch放到Controller Attachment中将更快且避免这个错误。 -* 默认值:false +* 描述:该配置用来控制是否在 Tuple/Block data 长度大于1.8G时,将 protoBuf request 序列化后和 Tuple/Block data 一起嵌入到 controller attachment 后通过 http brpc 发送。为了避免 protoBuf request 的长度超过2G时的错误:Bad request, error_text=[E1003]Fail to compress request。在过去的版本中,曾将 Tuple/Block data 放入 attachment 后通过默认的 baidu_std brpc 发送,但 attachment 超过2G时将被截断,通过 http brpc 发送不存在2G的限制。 +* 默认值:true ### `brpc_num_threads` diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 45662c10cd..9ce6194ddf 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -46,6 +46,7 @@ message PTransmitDataParams { optional PBlock block = 9; // transfer the RowBatch to the Controller Attachment optional bool transfer_by_attachment = 10 [default = false]; + optional PUniqueId query_id = 11; }; message PTransmitDataResult { @@ -431,8 +432,11 @@ message PResetRPCChannelResponse { repeated string channels = 2; }; +message PEmptyRequest {}; + service PBackendService { rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult); + rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult); // If #fragments of a query is < 3, use exec_plan_fragment directly. // If #fragments of a query is >=3, use exec_plan_fragment_prepare + exec_plan_fragment_start rpc exec_plan_fragment(PExecPlanFragmentRequest) returns (PExecPlanFragmentResult); @@ -442,6 +446,7 @@ service PBackendService { rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult); rpc tablet_writer_open(PTabletWriterOpenRequest) returns (PTabletWriterOpenResult); rpc tablet_writer_add_batch(PTabletWriterAddBatchRequest) returns (PTabletWriterAddBatchResult); + rpc tablet_writer_add_batch_by_http(PEmptyRequest) returns (PTabletWriterAddBatchResult); rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns (PTabletWriterCancelResult); rpc get_info(PProxyRequest) returns (PProxyResult); rpc update_cache(PUpdateCacheRequest) returns (PCacheResponse); @@ -454,6 +459,7 @@ service PBackendService { rpc apply_filter(PPublishFilterRequest) returns (PPublishFilterResponse); rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult); rpc transmit_block(PTransmitDataParams) returns (PTransmitDataResult); + rpc transmit_block_by_http(PEmptyRequest) returns (PTransmitDataResult); rpc check_rpc_channel(PCheckRPCChannelRequest) returns (PCheckRPCChannelResponse); rpc reset_rpc_channel(PResetRPCChannelRequest) returns (PResetRPCChannelResponse); rpc hand_shake(PHandShakeRequest) returns (PHandShakeResponse); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
