morningman commented on code in PR #9803:
URL: https://github.com/apache/incubator-doris/pull/9803#discussion_r888771409
##########
be/src/exec/tablet_sink.cpp:
##########
@@ -515,14 +513,31 @@ void NodeChannel::try_send_batch(RuntimeState* state) {
CHECK(_pending_batches_num == 0) << _pending_batches_num;
}
- if (_parent->_transfer_data_by_brpc_attachment && request.has_row_batch())
{
- request_row_batch_transfer_attachment<PTabletWriterAddBatchRequest,
-
ReusableClosure<PTabletWriterAddBatchResult>>(
- &request, _tuple_data_buffer, _add_batch_closure);
+ if (config::transfer_large_data_by_brpc && request.has_row_batch() &&
Review Comment:
User can change this config at runtime. So better to save its value when
creating tablet sink.
Otherwise, it may changed during loading process.
##########
be/src/runtime/data_stream_sender.cpp:
##########
@@ -149,12 +156,28 @@ Status DataStreamSender::Channel::send_batch(PRowBatch*
batch, bool eos) {
_closure->ref();
_closure->cntl.set_timeout_ms(_brpc_timeout_ms);
- if (_parent->_transfer_data_by_brpc_attachment &&
_brpc_request.has_row_batch()) {
- request_row_batch_transfer_attachment<PTransmitDataParams,
-
RefCountClosure<PTransmitDataResult>>(
- &_brpc_request, _parent->_tuple_data_buffer, _closure);
+ if (config::transfer_large_data_by_brpc && _brpc_request.has_row_batch() &&
Review Comment:
Save `transfer_large_data_by_brpc` when creating data stream sender
##########
be/src/service/internal_service.cpp:
##########
@@ -66,19 +66,53 @@ void
PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cntl_b
PTransmitDataResult* response,
google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
- VLOG_ROW << "transmit data: fragment_instance_id=" <<
print_id(request->finst_id())
- << " node=" << request->node_id();
+ // TODO(zxy) delete in 1.2 version
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
attachment_transfer_request_row_batch<PTransmitDataParams>(request, cntl);
+
+ _transmit_data(cntl_base, request, response, done, Status::OK());
+}
+
+void
PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController*
cntl_base,
+ const PEchoRequest* request,
+ PTransmitDataResult* response,
+ google::protobuf::Closure*
done) {
+ SCOPED_SWITCH_BTHREAD();
+ PTransmitDataParams* request_raw = new PTransmitDataParams();
Review Comment:
Memory leak?
##########
be/src/service/internal_service.cpp:
##########
@@ -122,20 +156,44 @@ void
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll
const
PTabletWriterAddBlockRequest* request,
PTabletWriterAddBlockResult* response,
google::protobuf::Closure*
done) {
+ // TODO(zxy) delete in 1.2 version
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+ attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request,
cntl);
+
+ _tablet_writer_add_block(cntl_base, request, response, done);
+}
+
+void PInternalServiceImpl::tablet_writer_add_block_by_http(
+ google::protobuf::RpcController* cntl_base, const
::doris::PEchoRequest* request,
+ PTabletWriterAddBlockResult* response, google::protobuf::Closure*
done) {
+ PTabletWriterAddBlockRequest* request_raw = new
PTabletWriterAddBlockRequest();
Review Comment:
Memory leak?
##########
be/src/service/internal_service.cpp:
##########
@@ -66,19 +66,53 @@ void
PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cntl_b
PTransmitDataResult* response,
google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
- VLOG_ROW << "transmit data: fragment_instance_id=" <<
print_id(request->finst_id())
- << " node=" << request->node_id();
+ // TODO(zxy) delete in 1.2 version
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
attachment_transfer_request_row_batch<PTransmitDataParams>(request, cntl);
+
+ _transmit_data(cntl_base, request, response, done, Status::OK());
+}
+
+void
PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController*
cntl_base,
+ const PEchoRequest* request,
+ PTransmitDataResult* response,
+ google::protobuf::Closure*
done) {
+ SCOPED_SWITCH_BTHREAD();
+ PTransmitDataParams* request_raw = new PTransmitDataParams();
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+ Status st =
attachment_extract_request_contain_tuple<PTransmitDataParams>(request_raw,
cntl);
+ _transmit_data(cntl_base, request_raw, response, done, st);
+}
+
+void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController*
cntl_base,
+ const PTransmitDataParams* request,
+ PTransmitDataResult* response,
+ google::protobuf::Closure* done,
Status extract_st) {
Review Comment:
```suggestion
google::protobuf::Closure* done,
const Status& extract_st) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]