github-actions[bot] commented on code in PR #16635:
URL: https://github.com/apache/doris/pull/16635#discussion_r1103619001


##########
be/src/service/internal_service.cpp:
##########
@@ -163,94 +200,103 @@
     st.to_protobuf(response->mutable_status());
 }
 
-void 
PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController*
 cntl_base,
+void 
PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController*
 controller,
                                                       const 
PExecPlanFragmentRequest* request,
                                                       PExecPlanFragmentResult* 
response,
                                                       
google::protobuf::Closure* done) {
-    exec_plan_fragment(cntl_base, request, response, done);
+    DorisMetrics::instance()->exec_plan_fragment_prepare->increment(1);
+    _light_work_pool.offer([this, controller, request, response, done]() {
+        exec_plan_fragment(controller, request, response, done);
+    });
 }
 
 void 
PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcController* 
controller,
                                                     const 
PExecPlanFragmentStartRequest* request,
                                                     PExecPlanFragmentResult* 
result,
                                                     google::protobuf::Closure* 
done) {
-    auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", 
controller);
-    auto scope = OpentelemetryScope {span};
-    brpc::ClosureGuard closure_guard(done);
-    auto st = _exec_env->fragment_mgr()->start_query_execution(request);
-    st.to_protobuf(result->mutable_status());
+    DorisMetrics::instance()->exec_plan_fragment_start->increment(1);
+    _light_work_pool.offer([this, controller, request, result, done]() {
+        auto span = 
telemetry::start_rpc_server_span("exec_plan_fragment_start", controller);
+        auto scope = OpentelemetryScope {span};
+        brpc::ClosureGuard closure_guard(done);
+        auto st = _exec_env->fragment_mgr()->start_query_execution(request);
+        st.to_protobuf(result->mutable_status());
+    });
 }
 
-void 
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* 
cntl_base,
+void 
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* 
controller,
                                                    const 
PTabletWriterAddBlockRequest* request,
                                                    
PTabletWriterAddBlockResult* response,
                                                    google::protobuf::Closure* 
done) {
     // TODO(zxy) delete in 1.2 version
-    google::protobuf::Closure* new_done = new 
NewHttpClosure<PTransmitDataParams>(done);
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, 
cntl);
-
-    _tablet_writer_add_block(cntl_base, request, response, new_done);
+    DorisMetrics::instance()->tablet_writer_add_block->increment(1);
+    _heavy_work_pool.offer([this, controller, request, response, done]() {
+        google::protobuf::Closure* new_done = new 
NewHttpClosure<PTransmitDataParams>(done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+        
attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl);
+        _tablet_writer_add_block(controller, request, response, new_done);
+    });
 }
 
 void PInternalServiceImpl::tablet_writer_add_block_by_http(
-        google::protobuf::RpcController* cntl_base, const 
::doris::PEmptyRequest* request,
+        google::protobuf::RpcController* controller, const 
::doris::PEmptyRequest* request,
         PTabletWriterAddBlockResult* response, google::protobuf::Closure* 
done) {
-    PTabletWriterAddBlockRequest* new_request = new 
PTabletWriterAddBlockRequest();
-    google::protobuf::Closure* new_done =
-            new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request, 
done);
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    Status st = 
attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(new_request,
-                                                                               
        cntl);
-    if (st.ok()) {
-        _tablet_writer_add_block(cntl_base, new_request, response, new_done);
-    } else {
-        st.to_protobuf(response->mutable_status());
-    }
+    DorisMetrics::instance()->tablet_writer_add_block_by_http->increment(1);
+    _heavy_work_pool.offer([this, controller, request, response, done]() {

Review Comment:
   warning: lambda capture 'request' is not used 
[clang-diagnostic-unused-lambda-capture]
   
   ```suggestion
       _heavy_work_pool.offer([this, controller, response, done]() {
   ```
   



##########
be/src/service/internal_service.cpp:
##########
@@ -134,22 +167,26 @@ void 
PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c
                                               const PTabletWriterOpenRequest* 
request,
                                               PTabletWriterOpenResult* 
response,
                                               google::protobuf::Closure* done) 
{
-    VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << 
request->index_id()
-             << ", txn_id=" << request->txn_id();
-    brpc::ClosureGuard closure_guard(done);
-    auto st = _exec_env->load_channel_mgr()->open(*request);
-    if (!st.ok()) {
-        LOG(WARNING) << "load channel open failed, message=" << st << ", id=" 
<< request->id()
-                     << ", index_id=" << request->index_id() << ", txn_id=" << 
request->txn_id();
-    }
-    st.to_protobuf(response->mutable_status());
+    DorisMetrics::instance()->tablet_writer_open->increment(1);
+    _light_work_pool.offer([this, request, response, done]() {

Review Comment:
   warning: lambda capture 'controller' is not used 
[clang-diagnostic-unused-lambda-capture]
   
   ```suggestion
       _light_work_pool.offer([this, request, response, done]() {
   ```
   



##########
be/src/service/internal_service.cpp:
##########
@@ -281,315 +327,354 @@
     }
 }
 
-void 
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* 
cntl_base,
+void 
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* 
controller,
                                                 const 
PCancelPlanFragmentRequest* request,
                                                 PCancelPlanFragmentResult* 
result,
                                                 google::protobuf::Closure* 
done) {
-    auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", 
cntl_base);
-    auto scope = OpentelemetryScope {span};
-    brpc::ClosureGuard closure_guard(done);
-    TUniqueId tid;
-    tid.__set_hi(request->finst_id().hi());
-    tid.__set_lo(request->finst_id().lo());
-
-    Status st = Status::OK();
-    if (request->has_cancel_reason()) {
-        LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid)
-                  << ", reason: " << request->cancel_reason();
-        _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
-    } else {
-        LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid);
-        _exec_env->fragment_mgr()->cancel(tid);
-    }
+    DorisMetrics::instance()->cancel_plan_fragment->increment(1);
+    _light_work_pool.offer([this, controller, request, result, done]() {
+        auto span = 
telemetry::start_rpc_server_span("exec_plan_fragment_start", controller);
+        auto scope = OpentelemetryScope {span};
+        brpc::ClosureGuard closure_guard(done);
+        TUniqueId tid;
+        tid.__set_hi(request->finst_id().hi());
+        tid.__set_lo(request->finst_id().lo());
+
+        Status st = Status::OK();
+        if (request->has_cancel_reason()) {
+            LOG(INFO) << "cancel fragment, fragment_instance_id=" << 
print_id(tid)
+                      << ", reason: " << request->cancel_reason();
+            _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
+        } else {
+            LOG(INFO) << "cancel fragment, fragment_instance_id=" << 
print_id(tid);
+            _exec_env->fragment_mgr()->cancel(tid);
+        }
 
-    // TODO: the logic seems useless, cancel only return Status::OK. remove it
-    st.to_protobuf(result->mutable_status());
+        // TODO: the logic seems useless, cancel only return Status::OK. 
remove it
+        st.to_protobuf(result->mutable_status());
+    });
 }
 
-void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* 
controller,
                                       const PFetchDataRequest* request, 
PFetchDataResult* result,
                                       google::protobuf::Closure* done) {
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
-    _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
+    DorisMetrics::instance()->fetch_data->increment(1);
+    _heavy_work_pool.offer([this, controller, request, result, done]() {
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+        GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
+        _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
+    });
 }
 
 void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* 
controller,
                                               const PFetchTableSchemaRequest* 
request,
                                               PFetchTableSchemaResult* result,
                                               google::protobuf::Closure* done) 
{
-    VLOG_RPC << "fetch table schema";
-    brpc::ClosureGuard closure_guard(done);
-    TFileScanRange file_scan_range;
-    Status st = Status::OK();
-    {
-        const uint8_t* buf = (const 
uint8_t*)(request->file_scan_range().data());
-        uint32_t len = request->file_scan_range().size();
-        st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
+    DorisMetrics::instance()->fetch_table_schema->increment(1);
+    _light_work_pool.offer([this, controller, request, result, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        TFileScanRange file_scan_range;
+        Status st = Status::OK();
+        {
+            const uint8_t* buf = (const 
uint8_t*)(request->file_scan_range().data());
+            uint32_t len = request->file_scan_range().size();
+            st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
+            if (!st.ok()) {
+                LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
+                st.to_protobuf(result->mutable_status());
+                return;
+            }
+        }
+        if (file_scan_range.__isset.ranges == false) {
+            st = Status::InternalError("can not get TFileRangeDesc.");
+            st.to_protobuf(result->mutable_status());
+            return;
+        }
+        if (file_scan_range.__isset.params == false) {
+            st = Status::InternalError("can not get TFileScanRangeParams.");
+            st.to_protobuf(result->mutable_status());
+            return;
+        }
+        const TFileRangeDesc& range = file_scan_range.ranges.at(0);
+        const TFileScanRangeParams& params = file_scan_range.params;
+
+        std::unique_ptr<vectorized::GenericReader> reader(nullptr);
+        std::unique_ptr<RuntimeProfile> profile(new 
RuntimeProfile("FetchTableSchema"));
+        IOContext io_ctx;
+        FileCacheStatistics file_cache_statis;
+        io_ctx.file_cache_stats = &file_cache_statis;
+        switch (params.format_type) {
+        case TFileFormatType::FORMAT_CSV_PLAIN:
+        case TFileFormatType::FORMAT_CSV_GZ:
+        case TFileFormatType::FORMAT_CSV_BZ2:
+        case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+        case TFileFormatType::FORMAT_CSV_LZOP:
+        case TFileFormatType::FORMAT_CSV_DEFLATE: {
+            // file_slots is no use
+            std::vector<SlotDescriptor*> file_slots;
+            reader.reset(
+                    new vectorized::CsvReader(profile.get(), params, range, 
file_slots, &io_ctx));
+            break;
+        }
+        case TFileFormatType::FORMAT_PARQUET: {
+            reader.reset(new vectorized::ParquetReader(params, range, 
&io_ctx));
+            break;
+        }
+        case TFileFormatType::FORMAT_ORC: {
+            std::vector<std::string> column_names;
+            reader.reset(new vectorized::OrcReader(params, range, 
column_names, "", &io_ctx));
+            break;
+        }
+        case TFileFormatType::FORMAT_JSON: {
+            std::vector<SlotDescriptor*> file_slots;
+            reader.reset(new vectorized::NewJsonReader(profile.get(), params, 
range, file_slots,
+                                                       &io_ctx));
+            break;
+        }
+        default:
+            st = Status::InternalError("Not supported file format in fetch 
table schema: {}",
+                                       params.format_type);
+            st.to_protobuf(result->mutable_status());
+            return;
+        }
+        std::vector<std::string> col_names;
+        std::vector<TypeDescriptor> col_types;
+        st = reader->get_parsed_schema(&col_names, &col_types);
         if (!st.ok()) {
             LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
             st.to_protobuf(result->mutable_status());
             return;
         }
-    }
-    if (file_scan_range.__isset.ranges == false) {
-        st = Status::InternalError("can not get TFileRangeDesc.");
-        st.to_protobuf(result->mutable_status());
-        return;
-    }
-    if (file_scan_range.__isset.params == false) {
-        st = Status::InternalError("can not get TFileScanRangeParams.");
-        st.to_protobuf(result->mutable_status());
-        return;
-    }
-    const TFileRangeDesc& range = file_scan_range.ranges.at(0);
-    const TFileScanRangeParams& params = file_scan_range.params;
-
-    std::unique_ptr<vectorized::GenericReader> reader(nullptr);
-    std::unique_ptr<RuntimeProfile> profile(new 
RuntimeProfile("FetchTableSchema"));
-    IOContext io_ctx;
-    FileCacheStatistics file_cache_statis;
-    io_ctx.file_cache_stats = &file_cache_statis;
-    switch (params.format_type) {
-    case TFileFormatType::FORMAT_CSV_PLAIN:
-    case TFileFormatType::FORMAT_CSV_GZ:
-    case TFileFormatType::FORMAT_CSV_BZ2:
-    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
-    case TFileFormatType::FORMAT_CSV_LZOP:
-    case TFileFormatType::FORMAT_CSV_DEFLATE: {
-        // file_slots is no use
-        std::vector<SlotDescriptor*> file_slots;
-        reader.reset(new vectorized::CsvReader(profile.get(), params, range, 
file_slots, &io_ctx));
-        break;
-    }
-    case TFileFormatType::FORMAT_PARQUET: {
-        reader.reset(new vectorized::ParquetReader(params, range, &io_ctx));
-        break;
-    }
-    case TFileFormatType::FORMAT_ORC: {
-        std::vector<std::string> column_names;
-        reader.reset(new vectorized::OrcReader(params, range, column_names, 
"", &io_ctx));
-        break;
-    }
-    case TFileFormatType::FORMAT_JSON: {
-        std::vector<SlotDescriptor*> file_slots;
-        reader.reset(
-                new vectorized::NewJsonReader(profile.get(), params, range, 
file_slots, &io_ctx));
-        break;
-    }
-    default:
-        st = Status::InternalError("Not supported file format in fetch table 
schema: {}",
-                                   params.format_type);
-        st.to_protobuf(result->mutable_status());
-        return;
-    }
-    std::vector<std::string> col_names;
-    std::vector<TypeDescriptor> col_types;
-    st = reader->get_parsed_schema(&col_names, &col_types);
-    if (!st.ok()) {
-        LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
+        result->set_column_nums(col_names.size());
+        for (size_t idx = 0; idx < col_names.size(); ++idx) {
+            result->add_column_names(col_names[idx]);
+        }
+        for (size_t idx = 0; idx < col_types.size(); ++idx) {
+            PTypeDesc* type_desc = result->add_column_types();
+            col_types[idx].to_protobuf(type_desc);
+        }
         st.to_protobuf(result->mutable_status());
-        return;
-    }
-    result->set_column_nums(col_names.size());
-    for (size_t idx = 0; idx < col_names.size(); ++idx) {
-        result->add_column_names(col_names[idx]);
-    }
-    for (size_t idx = 0; idx < col_types.size(); ++idx) {
-        PTypeDesc* type_desc = result->add_column_types();
-        col_types[idx].to_protobuf(type_desc);
-    }
-    st.to_protobuf(result->mutable_status());
+    });
 }
 
 void PInternalServiceImpl::get_info(google::protobuf::RpcController* 
controller,
                                     const PProxyRequest* request, 
PProxyResult* response,
                                     google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    // PProxyRequest is defined in gensrc/proto/internal_service.proto
-    // Currently it supports 2 kinds of requests:
-    // 1. get all kafka partition ids for given topic
-    // 2. get all kafka partition offsets for given topic and timestamp.
-    if (request->has_kafka_meta_request()) {
-        const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
-        if (!kafka_request.partition_id_for_latest_offsets().empty()) {
-            // get latest offsets for specified partition ids
-            std::vector<PIntegerPair> partition_offsets;
-            Status st = _exec_env->routine_load_task_executor()
-                                ->get_kafka_latest_offsets_for_partitions(
-                                        request->kafka_meta_request(), 
&partition_offsets);
-            if (st.ok()) {
-                PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
-                for (const auto& entry : partition_offsets) {
-                    PIntegerPair* res = part_offsets->add_offset_times();
-                    res->set_key(entry.key());
-                    res->set_val(entry.val());
+    DorisMetrics::instance()->get_info->increment(1);
+    _light_work_pool.offer([this, controller, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        // PProxyRequest is defined in gensrc/proto/internal_service.proto
+        // Currently it supports 2 kinds of requests:
+        // 1. get all kafka partition ids for given topic
+        // 2. get all kafka partition offsets for given topic and timestamp.
+        if (request->has_kafka_meta_request()) {
+            const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
+            if (!kafka_request.partition_id_for_latest_offsets().empty()) {
+                // get latest offsets for specified partition ids
+                std::vector<PIntegerPair> partition_offsets;
+                Status st = _exec_env->routine_load_task_executor()
+                                    ->get_kafka_latest_offsets_for_partitions(
+                                            request->kafka_meta_request(), 
&partition_offsets);
+                if (st.ok()) {
+                    PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
+                    for (const auto& entry : partition_offsets) {
+                        PIntegerPair* res = part_offsets->add_offset_times();
+                        res->set_key(entry.key());
+                        res->set_val(entry.val());
+                    }
                 }
-            }
-            st.to_protobuf(response->mutable_status());
-            return;
-        } else if (!kafka_request.offset_times().empty()) {
-            // if offset_times() has elements, which means this request is to 
get offset by timestamp.
-            std::vector<PIntegerPair> partition_offsets;
-            Status st =
-                    
_exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
-                            request->kafka_meta_request(), &partition_offsets);
-            if (st.ok()) {
-                PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
-                for (const auto& entry : partition_offsets) {
-                    PIntegerPair* res = part_offsets->add_offset_times();
-                    res->set_key(entry.key());
-                    res->set_val(entry.val());
+                st.to_protobuf(response->mutable_status());
+                return;
+            } else if (!kafka_request.offset_times().empty()) {
+                // if offset_times() has elements, which means this request is 
to get offset by timestamp.
+                std::vector<PIntegerPair> partition_offsets;
+                Status st = _exec_env->routine_load_task_executor()
+                                    ->get_kafka_partition_offsets_for_times(
+                                            request->kafka_meta_request(), 
&partition_offsets);
+                if (st.ok()) {
+                    PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
+                    for (const auto& entry : partition_offsets) {
+                        PIntegerPair* res = part_offsets->add_offset_times();
+                        res->set_key(entry.key());
+                        res->set_val(entry.val());
+                    }
                 }
-            }
-            st.to_protobuf(response->mutable_status());
-            return;
-        } else {
-            // get partition ids of topic
-            std::vector<int32_t> partition_ids;
-            Status st = 
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
-                    request->kafka_meta_request(), &partition_ids);
-            if (st.ok()) {
-                PKafkaMetaProxyResult* kafka_result = 
response->mutable_kafka_meta_result();
-                for (int32_t id : partition_ids) {
-                    kafka_result->add_partition_ids(id);
+                st.to_protobuf(response->mutable_status());
+                return;
+            } else {
+                // get partition ids of topic
+                std::vector<int32_t> partition_ids;
+                Status st = 
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
+                        request->kafka_meta_request(), &partition_ids);
+                if (st.ok()) {
+                    PKafkaMetaProxyResult* kafka_result = 
response->mutable_kafka_meta_result();
+                    for (int32_t id : partition_ids) {
+                        kafka_result->add_partition_ids(id);
+                    }
                 }
+                st.to_protobuf(response->mutable_status());
+                return;
             }
-            st.to_protobuf(response->mutable_status());
-            return;
         }
-    }
-    Status::OK().to_protobuf(response->mutable_status());
+        Status::OK().to_protobuf(response->mutable_status());
+    });
 }
 
 void PInternalServiceImpl::update_cache(google::protobuf::RpcController* 
controller,
                                         const PUpdateCacheRequest* request,
                                         PCacheResponse* response, 
google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    _exec_env->result_cache()->update(request, response);
+    DorisMetrics::instance()->update_cache->increment(1);
+    _light_work_pool.offer([this, controller, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        _exec_env->result_cache()->update(request, response);
+    });
 }
 
 void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* 
controller,
                                        const PFetchCacheRequest* request, 
PFetchCacheResult* result,
                                        google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    _exec_env->result_cache()->fetch(request, result);
+    DorisMetrics::instance()->fetch_cache->increment(1);
+    _light_work_pool.offer([this, controller, request, result, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        _exec_env->result_cache()->fetch(request, result);
+    });
 }
 
 void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* 
controller,
                                        const PClearCacheRequest* request, 
PCacheResponse* response,
                                        google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    _exec_env->result_cache()->clear(request, response);
+    DorisMetrics::instance()->clear_cache->increment(1);
+    _light_work_pool.offer([this, controller, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        _exec_env->result_cache()->clear(request, response);
+    });
 }
 
 void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* 
controller,
                                         const ::doris::PMergeFilterRequest* 
request,
                                         ::doris::PMergeFilterResponse* 
response,
                                         ::google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
-    butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
-    Status st = _exec_env->fragment_mgr()->merge_filter(request, 
&zero_copy_input_stream);
-    if (!st.ok()) {
-        LOG(WARNING) << "merge meet error" << st.to_string();
-    }
-    st.to_protobuf(response->mutable_status());
+    DorisMetrics::instance()->merge_filter->increment(1);
+    _light_work_pool.offer([this, controller, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
+        butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+        Status st = _exec_env->fragment_mgr()->merge_filter(request, 
&zero_copy_input_stream);
+        if (!st.ok()) {
+            LOG(WARNING) << "merge meet error" << st.to_string();
+        }
+        st.to_protobuf(response->mutable_status());
+    });
 }
 
 void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* 
controller,
                                         const ::doris::PPublishFilterRequest* 
request,
                                         ::doris::PPublishFilterResponse* 
response,
                                         ::google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
-    butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
-    UniqueId unique_id(request->query_id());
-    VLOG_NOTICE << "rpc apply_filter recv";
-    Status st = _exec_env->fragment_mgr()->apply_filter(request, 
&zero_copy_input_stream);
-    if (!st.ok()) {
-        LOG(WARNING) << "apply filter meet error: " << st.to_string();
-    }
-    st.to_protobuf(response->mutable_status());
+    DorisMetrics::instance()->apply_filter->increment(1);
+    _light_work_pool.offer([this, controller, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
+        butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+        UniqueId unique_id(request->query_id());
+        VLOG_NOTICE << "rpc apply_filter recv";
+        Status st = _exec_env->fragment_mgr()->apply_filter(request, 
&zero_copy_input_stream);
+        if (!st.ok()) {
+            LOG(WARNING) << "apply filter meet error: " << st.to_string();
+        }
+        st.to_protobuf(response->mutable_status());
+    });
 }
 
 void PInternalServiceImpl::send_data(google::protobuf::RpcController* 
controller,
                                      const PSendDataRequest* request, 
PSendDataResult* response,
                                      google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    TUniqueId fragment_instance_id;
-    fragment_instance_id.hi = request->fragment_instance_id().hi();
-    fragment_instance_id.lo = request->fragment_instance_id().lo();
-
-    auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
-    if (pipe == nullptr) {
-        response->mutable_status()->set_status_code(1);
-        response->mutable_status()->add_error_msgs("pipe is null");
-    } else {
-        for (int i = 0; i < request->data_size(); ++i) {
-            PDataRow* row = new PDataRow();
-            row->CopyFrom(request->data(i));
-            pipe->append_and_flush(reinterpret_cast<char*>(&row), sizeof(row),
-                                   sizeof(row) + row->ByteSizeLong());
+    DorisMetrics::instance()->send_data->increment(1);
+    _heavy_work_pool.offer([this, request, response, done]() {

Review Comment:
   warning: lambda capture 'controller' is not used 
[clang-diagnostic-unused-lambda-capture]
   
   ```suggestion
       _heavy_work_pool.offer([this, request, response, done]() {
   ```
   



##########
be/src/service/internal_service.cpp:
##########
@@ -606,31 +691,38 @@
     return FoldConstantExecutor().fold_constant_vexpr(t_request, response);
 }
 
-void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* 
controller,
                                           const PTransmitDataParams* request,
                                           PTransmitDataResult* response,
                                           google::protobuf::Closure* done) {
-    // TODO(zxy) delete in 1.2 version
-    google::protobuf::Closure* new_done = new 
NewHttpClosure<PTransmitDataParams>(done);
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
-
-    _transmit_block(cntl_base, request, response, new_done, Status::OK());
+    DorisMetrics::instance()->transmit_block->increment(1);
+    _heavy_work_pool.offer([this, controller, request, response, done]() {
+        // TODO(zxy) delete in 1.2 version
+        google::protobuf::Closure* new_done = new 
NewHttpClosure<PTransmitDataParams>(done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+        attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
+
+        _transmit_block(controller, request, response, new_done, Status::OK());
+    });
 }
 
-void 
PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* 
cntl_base,
+void 
PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* 
controller,
                                                   const PEmptyRequest* request,
                                                   PTransmitDataResult* 
response,
                                                   google::protobuf::Closure* 
done) {
-    PTransmitDataParams* new_request = new PTransmitDataParams();
-    google::protobuf::Closure* new_done =
-            new NewHttpClosure<PTransmitDataParams>(new_request, done);
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    Status st = 
attachment_extract_request_contain_block<PTransmitDataParams>(new_request, 
cntl);
-    _transmit_block(cntl_base, new_request, response, new_done, st);
+    DorisMetrics::instance()->transmit_block_by_http->increment(1);
+    _heavy_work_pool.offer([this, controller, response, done]() {

Review Comment:
   warning: lambda capture 'request' is not used 
[clang-diagnostic-unused-lambda-capture]
   
   ```suggestion
       _heavy_work_pool.offer([this, controller, response, done]() {
   ```
   



##########
be/src/service/internal_service.cpp:
##########
@@ -281,315 +327,354 @@
     }
 }
 
-void 
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* 
cntl_base,
+void 
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* 
controller,
                                                 const 
PCancelPlanFragmentRequest* request,
                                                 PCancelPlanFragmentResult* 
result,
                                                 google::protobuf::Closure* 
done) {
-    auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", 
cntl_base);
-    auto scope = OpentelemetryScope {span};
-    brpc::ClosureGuard closure_guard(done);
-    TUniqueId tid;
-    tid.__set_hi(request->finst_id().hi());
-    tid.__set_lo(request->finst_id().lo());
-
-    Status st = Status::OK();
-    if (request->has_cancel_reason()) {
-        LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid)
-                  << ", reason: " << request->cancel_reason();
-        _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
-    } else {
-        LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid);
-        _exec_env->fragment_mgr()->cancel(tid);
-    }
+    DorisMetrics::instance()->cancel_plan_fragment->increment(1);
+    _light_work_pool.offer([this, controller, request, result, done]() {
+        auto span = 
telemetry::start_rpc_server_span("exec_plan_fragment_start", controller);
+        auto scope = OpentelemetryScope {span};
+        brpc::ClosureGuard closure_guard(done);
+        TUniqueId tid;
+        tid.__set_hi(request->finst_id().hi());
+        tid.__set_lo(request->finst_id().lo());
+
+        Status st = Status::OK();
+        if (request->has_cancel_reason()) {
+            LOG(INFO) << "cancel fragment, fragment_instance_id=" << 
print_id(tid)
+                      << ", reason: " << request->cancel_reason();
+            _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
+        } else {
+            LOG(INFO) << "cancel fragment, fragment_instance_id=" << 
print_id(tid);
+            _exec_env->fragment_mgr()->cancel(tid);
+        }
 
-    // TODO: the logic seems useless, cancel only return Status::OK. remove it
-    st.to_protobuf(result->mutable_status());
+        // TODO: the logic seems useless, cancel only return Status::OK. 
remove it
+        st.to_protobuf(result->mutable_status());
+    });
 }
 
-void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* 
controller,
                                       const PFetchDataRequest* request, 
PFetchDataResult* result,
                                       google::protobuf::Closure* done) {
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
-    _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
+    DorisMetrics::instance()->fetch_data->increment(1);
+    _heavy_work_pool.offer([this, controller, request, result, done]() {
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+        GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
+        _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
+    });
 }
 
 void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* 
controller,
                                               const PFetchTableSchemaRequest* 
request,
                                               PFetchTableSchemaResult* result,
                                               google::protobuf::Closure* done) 
{
-    VLOG_RPC << "fetch table schema";
-    brpc::ClosureGuard closure_guard(done);
-    TFileScanRange file_scan_range;
-    Status st = Status::OK();
-    {
-        const uint8_t* buf = (const 
uint8_t*)(request->file_scan_range().data());
-        uint32_t len = request->file_scan_range().size();
-        st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
+    DorisMetrics::instance()->fetch_table_schema->increment(1);
+    _light_work_pool.offer([this, controller, request, result, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        TFileScanRange file_scan_range;
+        Status st = Status::OK();
+        {
+            const uint8_t* buf = (const 
uint8_t*)(request->file_scan_range().data());
+            uint32_t len = request->file_scan_range().size();
+            st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
+            if (!st.ok()) {
+                LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
+                st.to_protobuf(result->mutable_status());
+                return;
+            }
+        }
+        if (file_scan_range.__isset.ranges == false) {
+            st = Status::InternalError("can not get TFileRangeDesc.");
+            st.to_protobuf(result->mutable_status());
+            return;
+        }
+        if (file_scan_range.__isset.params == false) {
+            st = Status::InternalError("can not get TFileScanRangeParams.");
+            st.to_protobuf(result->mutable_status());
+            return;
+        }
+        const TFileRangeDesc& range = file_scan_range.ranges.at(0);
+        const TFileScanRangeParams& params = file_scan_range.params;
+
+        std::unique_ptr<vectorized::GenericReader> reader(nullptr);
+        std::unique_ptr<RuntimeProfile> profile(new 
RuntimeProfile("FetchTableSchema"));
+        IOContext io_ctx;
+        FileCacheStatistics file_cache_statis;
+        io_ctx.file_cache_stats = &file_cache_statis;
+        switch (params.format_type) {
+        case TFileFormatType::FORMAT_CSV_PLAIN:
+        case TFileFormatType::FORMAT_CSV_GZ:
+        case TFileFormatType::FORMAT_CSV_BZ2:
+        case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+        case TFileFormatType::FORMAT_CSV_LZOP:
+        case TFileFormatType::FORMAT_CSV_DEFLATE: {
+            // file_slots is no use
+            std::vector<SlotDescriptor*> file_slots;
+            reader.reset(
+                    new vectorized::CsvReader(profile.get(), params, range, 
file_slots, &io_ctx));
+            break;
+        }
+        case TFileFormatType::FORMAT_PARQUET: {
+            reader.reset(new vectorized::ParquetReader(params, range, 
&io_ctx));
+            break;
+        }
+        case TFileFormatType::FORMAT_ORC: {
+            std::vector<std::string> column_names;
+            reader.reset(new vectorized::OrcReader(params, range, 
column_names, "", &io_ctx));
+            break;
+        }
+        case TFileFormatType::FORMAT_JSON: {
+            std::vector<SlotDescriptor*> file_slots;
+            reader.reset(new vectorized::NewJsonReader(profile.get(), params, 
range, file_slots,
+                                                       &io_ctx));
+            break;
+        }
+        default:
+            st = Status::InternalError("Not supported file format in fetch 
table schema: {}",
+                                       params.format_type);
+            st.to_protobuf(result->mutable_status());
+            return;
+        }
+        std::vector<std::string> col_names;
+        std::vector<TypeDescriptor> col_types;
+        st = reader->get_parsed_schema(&col_names, &col_types);
         if (!st.ok()) {
             LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
             st.to_protobuf(result->mutable_status());
             return;
         }
-    }
-    if (file_scan_range.__isset.ranges == false) {
-        st = Status::InternalError("can not get TFileRangeDesc.");
-        st.to_protobuf(result->mutable_status());
-        return;
-    }
-    if (file_scan_range.__isset.params == false) {
-        st = Status::InternalError("can not get TFileScanRangeParams.");
-        st.to_protobuf(result->mutable_status());
-        return;
-    }
-    const TFileRangeDesc& range = file_scan_range.ranges.at(0);
-    const TFileScanRangeParams& params = file_scan_range.params;
-
-    std::unique_ptr<vectorized::GenericReader> reader(nullptr);
-    std::unique_ptr<RuntimeProfile> profile(new 
RuntimeProfile("FetchTableSchema"));
-    IOContext io_ctx;
-    FileCacheStatistics file_cache_statis;
-    io_ctx.file_cache_stats = &file_cache_statis;
-    switch (params.format_type) {
-    case TFileFormatType::FORMAT_CSV_PLAIN:
-    case TFileFormatType::FORMAT_CSV_GZ:
-    case TFileFormatType::FORMAT_CSV_BZ2:
-    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
-    case TFileFormatType::FORMAT_CSV_LZOP:
-    case TFileFormatType::FORMAT_CSV_DEFLATE: {
-        // file_slots is no use
-        std::vector<SlotDescriptor*> file_slots;
-        reader.reset(new vectorized::CsvReader(profile.get(), params, range, 
file_slots, &io_ctx));
-        break;
-    }
-    case TFileFormatType::FORMAT_PARQUET: {
-        reader.reset(new vectorized::ParquetReader(params, range, &io_ctx));
-        break;
-    }
-    case TFileFormatType::FORMAT_ORC: {
-        std::vector<std::string> column_names;
-        reader.reset(new vectorized::OrcReader(params, range, column_names, 
"", &io_ctx));
-        break;
-    }
-    case TFileFormatType::FORMAT_JSON: {
-        std::vector<SlotDescriptor*> file_slots;
-        reader.reset(
-                new vectorized::NewJsonReader(profile.get(), params, range, 
file_slots, &io_ctx));
-        break;
-    }
-    default:
-        st = Status::InternalError("Not supported file format in fetch table 
schema: {}",
-                                   params.format_type);
-        st.to_protobuf(result->mutable_status());
-        return;
-    }
-    std::vector<std::string> col_names;
-    std::vector<TypeDescriptor> col_types;
-    st = reader->get_parsed_schema(&col_names, &col_types);
-    if (!st.ok()) {
-        LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
+        result->set_column_nums(col_names.size());
+        for (size_t idx = 0; idx < col_names.size(); ++idx) {
+            result->add_column_names(col_names[idx]);
+        }
+        for (size_t idx = 0; idx < col_types.size(); ++idx) {
+            PTypeDesc* type_desc = result->add_column_types();
+            col_types[idx].to_protobuf(type_desc);
+        }
         st.to_protobuf(result->mutable_status());
-        return;
-    }
-    result->set_column_nums(col_names.size());
-    for (size_t idx = 0; idx < col_names.size(); ++idx) {
-        result->add_column_names(col_names[idx]);
-    }
-    for (size_t idx = 0; idx < col_types.size(); ++idx) {
-        PTypeDesc* type_desc = result->add_column_types();
-        col_types[idx].to_protobuf(type_desc);
-    }
-    st.to_protobuf(result->mutable_status());
+    });
 }
 
 void PInternalServiceImpl::get_info(google::protobuf::RpcController* 
controller,
                                     const PProxyRequest* request, 
PProxyResult* response,
                                     google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    // PProxyRequest is defined in gensrc/proto/internal_service.proto
-    // Currently it supports 2 kinds of requests:
-    // 1. get all kafka partition ids for given topic
-    // 2. get all kafka partition offsets for given topic and timestamp.
-    if (request->has_kafka_meta_request()) {
-        const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
-        if (!kafka_request.partition_id_for_latest_offsets().empty()) {
-            // get latest offsets for specified partition ids
-            std::vector<PIntegerPair> partition_offsets;
-            Status st = _exec_env->routine_load_task_executor()
-                                ->get_kafka_latest_offsets_for_partitions(
-                                        request->kafka_meta_request(), 
&partition_offsets);
-            if (st.ok()) {
-                PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
-                for (const auto& entry : partition_offsets) {
-                    PIntegerPair* res = part_offsets->add_offset_times();
-                    res->set_key(entry.key());
-                    res->set_val(entry.val());
+    DorisMetrics::instance()->get_info->increment(1);
+    _light_work_pool.offer([this, controller, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        // PProxyRequest is defined in gensrc/proto/internal_service.proto
+        // Currently it supports 2 kinds of requests:
+        // 1. get all kafka partition ids for given topic
+        // 2. get all kafka partition offsets for given topic and timestamp.
+        if (request->has_kafka_meta_request()) {
+            const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
+            if (!kafka_request.partition_id_for_latest_offsets().empty()) {
+                // get latest offsets for specified partition ids
+                std::vector<PIntegerPair> partition_offsets;
+                Status st = _exec_env->routine_load_task_executor()
+                                    ->get_kafka_latest_offsets_for_partitions(
+                                            request->kafka_meta_request(), 
&partition_offsets);
+                if (st.ok()) {
+                    PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
+                    for (const auto& entry : partition_offsets) {
+                        PIntegerPair* res = part_offsets->add_offset_times();
+                        res->set_key(entry.key());
+                        res->set_val(entry.val());
+                    }
                 }
-            }
-            st.to_protobuf(response->mutable_status());
-            return;
-        } else if (!kafka_request.offset_times().empty()) {
-            // if offset_times() has elements, which means this request is to 
get offset by timestamp.
-            std::vector<PIntegerPair> partition_offsets;
-            Status st =
-                    
_exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
-                            request->kafka_meta_request(), &partition_offsets);
-            if (st.ok()) {
-                PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
-                for (const auto& entry : partition_offsets) {
-                    PIntegerPair* res = part_offsets->add_offset_times();
-                    res->set_key(entry.key());
-                    res->set_val(entry.val());
+                st.to_protobuf(response->mutable_status());
+                return;
+            } else if (!kafka_request.offset_times().empty()) {
+                // if offset_times() has elements, which means this request is 
to get offset by timestamp.
+                std::vector<PIntegerPair> partition_offsets;
+                Status st = _exec_env->routine_load_task_executor()
+                                    ->get_kafka_partition_offsets_for_times(
+                                            request->kafka_meta_request(), 
&partition_offsets);
+                if (st.ok()) {
+                    PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
+                    for (const auto& entry : partition_offsets) {
+                        PIntegerPair* res = part_offsets->add_offset_times();
+                        res->set_key(entry.key());
+                        res->set_val(entry.val());
+                    }
                 }
-            }
-            st.to_protobuf(response->mutable_status());
-            return;
-        } else {
-            // get partition ids of topic
-            std::vector<int32_t> partition_ids;
-            Status st = 
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
-                    request->kafka_meta_request(), &partition_ids);
-            if (st.ok()) {
-                PKafkaMetaProxyResult* kafka_result = 
response->mutable_kafka_meta_result();
-                for (int32_t id : partition_ids) {
-                    kafka_result->add_partition_ids(id);
+                st.to_protobuf(response->mutable_status());
+                return;
+            } else {
+                // get partition ids of topic
+                std::vector<int32_t> partition_ids;
+                Status st = 
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
+                        request->kafka_meta_request(), &partition_ids);
+                if (st.ok()) {
+                    PKafkaMetaProxyResult* kafka_result = 
response->mutable_kafka_meta_result();
+                    for (int32_t id : partition_ids) {
+                        kafka_result->add_partition_ids(id);
+                    }
                 }
+                st.to_protobuf(response->mutable_status());
+                return;
             }
-            st.to_protobuf(response->mutable_status());
-            return;
         }
-    }
-    Status::OK().to_protobuf(response->mutable_status());
+        Status::OK().to_protobuf(response->mutable_status());
+    });
 }
 
 void PInternalServiceImpl::update_cache(google::protobuf::RpcController* 
controller,
                                         const PUpdateCacheRequest* request,
                                         PCacheResponse* response, 
google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    _exec_env->result_cache()->update(request, response);
+    DorisMetrics::instance()->update_cache->increment(1);
+    _light_work_pool.offer([this, controller, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        _exec_env->result_cache()->update(request, response);
+    });
 }
 
 void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* 
controller,
                                        const PFetchCacheRequest* request, 
PFetchCacheResult* result,
                                        google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    _exec_env->result_cache()->fetch(request, result);
+    DorisMetrics::instance()->fetch_cache->increment(1);
+    _light_work_pool.offer([this, controller, request, result, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        _exec_env->result_cache()->fetch(request, result);
+    });
 }
 
 void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* 
controller,
                                        const PClearCacheRequest* request, 
PCacheResponse* response,
                                        google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    _exec_env->result_cache()->clear(request, response);
+    DorisMetrics::instance()->clear_cache->increment(1);
+    _light_work_pool.offer([this, controller, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        _exec_env->result_cache()->clear(request, response);
+    });
 }
 
 void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* 
controller,
                                         const ::doris::PMergeFilterRequest* 
request,
                                         ::doris::PMergeFilterResponse* 
response,
                                         ::google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
-    butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
-    Status st = _exec_env->fragment_mgr()->merge_filter(request, 
&zero_copy_input_stream);
-    if (!st.ok()) {
-        LOG(WARNING) << "merge meet error" << st.to_string();
-    }
-    st.to_protobuf(response->mutable_status());
+    DorisMetrics::instance()->merge_filter->increment(1);
+    _light_work_pool.offer([this, controller, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
+        butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+        Status st = _exec_env->fragment_mgr()->merge_filter(request, 
&zero_copy_input_stream);
+        if (!st.ok()) {
+            LOG(WARNING) << "merge meet error" << st.to_string();
+        }
+        st.to_protobuf(response->mutable_status());
+    });
 }
 
 void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* 
controller,
                                         const ::doris::PPublishFilterRequest* 
request,
                                         ::doris::PPublishFilterResponse* 
response,
                                         ::google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
-    butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
-    UniqueId unique_id(request->query_id());
-    VLOG_NOTICE << "rpc apply_filter recv";
-    Status st = _exec_env->fragment_mgr()->apply_filter(request, 
&zero_copy_input_stream);
-    if (!st.ok()) {
-        LOG(WARNING) << "apply filter meet error: " << st.to_string();
-    }
-    st.to_protobuf(response->mutable_status());
+    DorisMetrics::instance()->apply_filter->increment(1);
+    _light_work_pool.offer([this, controller, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
+        butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+        UniqueId unique_id(request->query_id());
+        VLOG_NOTICE << "rpc apply_filter recv";
+        Status st = _exec_env->fragment_mgr()->apply_filter(request, 
&zero_copy_input_stream);
+        if (!st.ok()) {
+            LOG(WARNING) << "apply filter meet error: " << st.to_string();
+        }
+        st.to_protobuf(response->mutable_status());
+    });
 }
 
 void PInternalServiceImpl::send_data(google::protobuf::RpcController* 
controller,
                                      const PSendDataRequest* request, 
PSendDataResult* response,
                                      google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    TUniqueId fragment_instance_id;
-    fragment_instance_id.hi = request->fragment_instance_id().hi();
-    fragment_instance_id.lo = request->fragment_instance_id().lo();
-
-    auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
-    if (pipe == nullptr) {
-        response->mutable_status()->set_status_code(1);
-        response->mutable_status()->add_error_msgs("pipe is null");
-    } else {
-        for (int i = 0; i < request->data_size(); ++i) {
-            PDataRow* row = new PDataRow();
-            row->CopyFrom(request->data(i));
-            pipe->append_and_flush(reinterpret_cast<char*>(&row), sizeof(row),
-                                   sizeof(row) + row->ByteSizeLong());
+    DorisMetrics::instance()->send_data->increment(1);
+    _heavy_work_pool.offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        TUniqueId fragment_instance_id;
+        fragment_instance_id.hi = request->fragment_instance_id().hi();
+        fragment_instance_id.lo = request->fragment_instance_id().lo();
+
+        auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+        if (pipe == nullptr) {
+            response->mutable_status()->set_status_code(1);
+            response->mutable_status()->add_error_msgs("pipe is null");
+        } else {
+            for (int i = 0; i < request->data_size(); ++i) {
+                PDataRow* row = new PDataRow();
+                row->CopyFrom(request->data(i));
+                pipe->append_and_flush(reinterpret_cast<char*>(&row), 
sizeof(row),
+                                       sizeof(row) + row->ByteSizeLong());
+            }
+            response->mutable_status()->set_status_code(0);
         }
-        response->mutable_status()->set_status_code(0);
-    }
+    });
 }
 
 void PInternalServiceImpl::commit(google::protobuf::RpcController* controller,
                                   const PCommitRequest* request, 
PCommitResult* response,
                                   google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    TUniqueId fragment_instance_id;
-    fragment_instance_id.hi = request->fragment_instance_id().hi();
-    fragment_instance_id.lo = request->fragment_instance_id().lo();
-
-    auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
-    if (pipe == nullptr) {
-        response->mutable_status()->set_status_code(1);
-        response->mutable_status()->add_error_msgs("pipe is null");
-    } else {
-        pipe->finish();
-        response->mutable_status()->set_status_code(0);
-    }
+    DorisMetrics::instance()->commit->increment(1);
+    _light_work_pool.offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        TUniqueId fragment_instance_id;
+        fragment_instance_id.hi = request->fragment_instance_id().hi();
+        fragment_instance_id.lo = request->fragment_instance_id().lo();
+
+        auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+        if (pipe == nullptr) {
+            response->mutable_status()->set_status_code(1);
+            response->mutable_status()->add_error_msgs("pipe is null");
+        } else {
+            pipe->finish();
+            response->mutable_status()->set_status_code(0);
+        }
+    });
 }
 
 void PInternalServiceImpl::rollback(google::protobuf::RpcController* 
controller,
                                     const PRollbackRequest* request, 
PRollbackResult* response,
                                     google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    TUniqueId fragment_instance_id;
-    fragment_instance_id.hi = request->fragment_instance_id().hi();
-    fragment_instance_id.lo = request->fragment_instance_id().lo();
-
-    auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
-    if (pipe == nullptr) {
-        response->mutable_status()->set_status_code(1);
-        response->mutable_status()->add_error_msgs("pipe is null");
-    } else {
-        pipe->cancel("rollback");
-        response->mutable_status()->set_status_code(0);
-    }
+    DorisMetrics::instance()->rollback->increment(1);
+    _light_work_pool.offer([this, request, response, done]() {

Review Comment:
   warning: lambda capture 'controller' is not used 
[clang-diagnostic-unused-lambda-capture]
   
   ```suggestion
       _light_work_pool.offer([this, request, response, done]() {
   ```
   



##########
be/src/service/internal_service.cpp:
##########
@@ -668,75 +760,86 @@
                                              const PCheckRPCChannelRequest* 
request,
                                              PCheckRPCChannelResponse* 
response,
                                              google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    response->mutable_status()->set_status_code(0);
-    if (request->data().size() != request->size()) {
-        std::stringstream ss;
-        ss << "data size not same, expected: " << request->size()
-           << ", actual: " << request->data().size();
-        response->mutable_status()->add_error_msgs(ss.str());
-        response->mutable_status()->set_status_code(1);
-
-    } else {
-        Md5Digest digest;
-        digest.update(static_cast<const void*>(request->data().c_str()), 
request->data().size());
-        digest.digest();
-        if (!iequal(digest.hex(), request->md5())) {
+    DorisMetrics::instance()->check_rpc_channel->increment(1);
+    _light_work_pool.offer([controller, request, response, done]() {

Review Comment:
   warning: lambda capture 'this' is not used 
[clang-diagnostic-unused-lambda-capture]
   
   ```suggestion
       _light_work_pool.offer([ controller, request, response, done]() {
   ```
   



##########
be/src/service/internal_service.cpp:
##########
@@ -281,315 +327,354 @@
     }
 }
 
-void 
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* 
cntl_base,
+void 
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* 
controller,
                                                 const 
PCancelPlanFragmentRequest* request,
                                                 PCancelPlanFragmentResult* 
result,
                                                 google::protobuf::Closure* 
done) {
-    auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", 
cntl_base);
-    auto scope = OpentelemetryScope {span};
-    brpc::ClosureGuard closure_guard(done);
-    TUniqueId tid;
-    tid.__set_hi(request->finst_id().hi());
-    tid.__set_lo(request->finst_id().lo());
-
-    Status st = Status::OK();
-    if (request->has_cancel_reason()) {
-        LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid)
-                  << ", reason: " << request->cancel_reason();
-        _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
-    } else {
-        LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid);
-        _exec_env->fragment_mgr()->cancel(tid);
-    }
+    DorisMetrics::instance()->cancel_plan_fragment->increment(1);
+    _light_work_pool.offer([this, controller, request, result, done]() {
+        auto span = 
telemetry::start_rpc_server_span("exec_plan_fragment_start", controller);
+        auto scope = OpentelemetryScope {span};
+        brpc::ClosureGuard closure_guard(done);
+        TUniqueId tid;
+        tid.__set_hi(request->finst_id().hi());
+        tid.__set_lo(request->finst_id().lo());
+
+        Status st = Status::OK();
+        if (request->has_cancel_reason()) {
+            LOG(INFO) << "cancel fragment, fragment_instance_id=" << 
print_id(tid)
+                      << ", reason: " << request->cancel_reason();
+            _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
+        } else {
+            LOG(INFO) << "cancel fragment, fragment_instance_id=" << 
print_id(tid);
+            _exec_env->fragment_mgr()->cancel(tid);
+        }
 
-    // TODO: the logic seems useless, cancel only return Status::OK. remove it
-    st.to_protobuf(result->mutable_status());
+        // TODO: the logic seems useless, cancel only return Status::OK. 
remove it
+        st.to_protobuf(result->mutable_status());
+    });
 }
 
-void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* 
controller,
                                       const PFetchDataRequest* request, 
PFetchDataResult* result,
                                       google::protobuf::Closure* done) {
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
-    _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
+    DorisMetrics::instance()->fetch_data->increment(1);
+    _heavy_work_pool.offer([this, controller, request, result, done]() {
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+        GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
+        _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
+    });
 }
 
 void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* 
controller,
                                               const PFetchTableSchemaRequest* 
request,
                                               PFetchTableSchemaResult* result,
                                               google::protobuf::Closure* done) 
{
-    VLOG_RPC << "fetch table schema";
-    brpc::ClosureGuard closure_guard(done);
-    TFileScanRange file_scan_range;
-    Status st = Status::OK();
-    {
-        const uint8_t* buf = (const 
uint8_t*)(request->file_scan_range().data());
-        uint32_t len = request->file_scan_range().size();
-        st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
+    DorisMetrics::instance()->fetch_table_schema->increment(1);
+    _light_work_pool.offer([this, controller, request, result, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        TFileScanRange file_scan_range;
+        Status st = Status::OK();
+        {
+            const uint8_t* buf = (const 
uint8_t*)(request->file_scan_range().data());
+            uint32_t len = request->file_scan_range().size();
+            st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
+            if (!st.ok()) {
+                LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
+                st.to_protobuf(result->mutable_status());
+                return;
+            }
+        }
+        if (file_scan_range.__isset.ranges == false) {
+            st = Status::InternalError("can not get TFileRangeDesc.");
+            st.to_protobuf(result->mutable_status());
+            return;
+        }
+        if (file_scan_range.__isset.params == false) {
+            st = Status::InternalError("can not get TFileScanRangeParams.");
+            st.to_protobuf(result->mutable_status());
+            return;
+        }
+        const TFileRangeDesc& range = file_scan_range.ranges.at(0);
+        const TFileScanRangeParams& params = file_scan_range.params;
+
+        std::unique_ptr<vectorized::GenericReader> reader(nullptr);
+        std::unique_ptr<RuntimeProfile> profile(new 
RuntimeProfile("FetchTableSchema"));
+        IOContext io_ctx;
+        FileCacheStatistics file_cache_statis;
+        io_ctx.file_cache_stats = &file_cache_statis;
+        switch (params.format_type) {
+        case TFileFormatType::FORMAT_CSV_PLAIN:
+        case TFileFormatType::FORMAT_CSV_GZ:
+        case TFileFormatType::FORMAT_CSV_BZ2:
+        case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+        case TFileFormatType::FORMAT_CSV_LZOP:
+        case TFileFormatType::FORMAT_CSV_DEFLATE: {
+            // file_slots is no use
+            std::vector<SlotDescriptor*> file_slots;
+            reader.reset(
+                    new vectorized::CsvReader(profile.get(), params, range, 
file_slots, &io_ctx));
+            break;
+        }
+        case TFileFormatType::FORMAT_PARQUET: {
+            reader.reset(new vectorized::ParquetReader(params, range, 
&io_ctx));
+            break;
+        }
+        case TFileFormatType::FORMAT_ORC: {
+            std::vector<std::string> column_names;
+            reader.reset(new vectorized::OrcReader(params, range, 
column_names, "", &io_ctx));
+            break;
+        }
+        case TFileFormatType::FORMAT_JSON: {
+            std::vector<SlotDescriptor*> file_slots;
+            reader.reset(new vectorized::NewJsonReader(profile.get(), params, 
range, file_slots,
+                                                       &io_ctx));
+            break;
+        }
+        default:
+            st = Status::InternalError("Not supported file format in fetch 
table schema: {}",
+                                       params.format_type);
+            st.to_protobuf(result->mutable_status());
+            return;
+        }
+        std::vector<std::string> col_names;
+        std::vector<TypeDescriptor> col_types;
+        st = reader->get_parsed_schema(&col_names, &col_types);
         if (!st.ok()) {
             LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
             st.to_protobuf(result->mutable_status());
             return;
         }
-    }
-    if (file_scan_range.__isset.ranges == false) {
-        st = Status::InternalError("can not get TFileRangeDesc.");
-        st.to_protobuf(result->mutable_status());
-        return;
-    }
-    if (file_scan_range.__isset.params == false) {
-        st = Status::InternalError("can not get TFileScanRangeParams.");
-        st.to_protobuf(result->mutable_status());
-        return;
-    }
-    const TFileRangeDesc& range = file_scan_range.ranges.at(0);
-    const TFileScanRangeParams& params = file_scan_range.params;
-
-    std::unique_ptr<vectorized::GenericReader> reader(nullptr);
-    std::unique_ptr<RuntimeProfile> profile(new 
RuntimeProfile("FetchTableSchema"));
-    IOContext io_ctx;
-    FileCacheStatistics file_cache_statis;
-    io_ctx.file_cache_stats = &file_cache_statis;
-    switch (params.format_type) {
-    case TFileFormatType::FORMAT_CSV_PLAIN:
-    case TFileFormatType::FORMAT_CSV_GZ:
-    case TFileFormatType::FORMAT_CSV_BZ2:
-    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
-    case TFileFormatType::FORMAT_CSV_LZOP:
-    case TFileFormatType::FORMAT_CSV_DEFLATE: {
-        // file_slots is no use
-        std::vector<SlotDescriptor*> file_slots;
-        reader.reset(new vectorized::CsvReader(profile.get(), params, range, 
file_slots, &io_ctx));
-        break;
-    }
-    case TFileFormatType::FORMAT_PARQUET: {
-        reader.reset(new vectorized::ParquetReader(params, range, &io_ctx));
-        break;
-    }
-    case TFileFormatType::FORMAT_ORC: {
-        std::vector<std::string> column_names;
-        reader.reset(new vectorized::OrcReader(params, range, column_names, 
"", &io_ctx));
-        break;
-    }
-    case TFileFormatType::FORMAT_JSON: {
-        std::vector<SlotDescriptor*> file_slots;
-        reader.reset(
-                new vectorized::NewJsonReader(profile.get(), params, range, 
file_slots, &io_ctx));
-        break;
-    }
-    default:
-        st = Status::InternalError("Not supported file format in fetch table 
schema: {}",
-                                   params.format_type);
-        st.to_protobuf(result->mutable_status());
-        return;
-    }
-    std::vector<std::string> col_names;
-    std::vector<TypeDescriptor> col_types;
-    st = reader->get_parsed_schema(&col_names, &col_types);
-    if (!st.ok()) {
-        LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
+        result->set_column_nums(col_names.size());
+        for (size_t idx = 0; idx < col_names.size(); ++idx) {
+            result->add_column_names(col_names[idx]);
+        }
+        for (size_t idx = 0; idx < col_types.size(); ++idx) {
+            PTypeDesc* type_desc = result->add_column_types();
+            col_types[idx].to_protobuf(type_desc);
+        }
         st.to_protobuf(result->mutable_status());
-        return;
-    }
-    result->set_column_nums(col_names.size());
-    for (size_t idx = 0; idx < col_names.size(); ++idx) {
-        result->add_column_names(col_names[idx]);
-    }
-    for (size_t idx = 0; idx < col_types.size(); ++idx) {
-        PTypeDesc* type_desc = result->add_column_types();
-        col_types[idx].to_protobuf(type_desc);
-    }
-    st.to_protobuf(result->mutable_status());
+    });
 }
 
 void PInternalServiceImpl::get_info(google::protobuf::RpcController* 
controller,
                                     const PProxyRequest* request, 
PProxyResult* response,
                                     google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    // PProxyRequest is defined in gensrc/proto/internal_service.proto
-    // Currently it supports 2 kinds of requests:
-    // 1. get all kafka partition ids for given topic
-    // 2. get all kafka partition offsets for given topic and timestamp.
-    if (request->has_kafka_meta_request()) {
-        const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
-        if (!kafka_request.partition_id_for_latest_offsets().empty()) {
-            // get latest offsets for specified partition ids
-            std::vector<PIntegerPair> partition_offsets;
-            Status st = _exec_env->routine_load_task_executor()
-                                ->get_kafka_latest_offsets_for_partitions(
-                                        request->kafka_meta_request(), 
&partition_offsets);
-            if (st.ok()) {
-                PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
-                for (const auto& entry : partition_offsets) {
-                    PIntegerPair* res = part_offsets->add_offset_times();
-                    res->set_key(entry.key());
-                    res->set_val(entry.val());
+    DorisMetrics::instance()->get_info->increment(1);
+    _light_work_pool.offer([this, controller, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        // PProxyRequest is defined in gensrc/proto/internal_service.proto
+        // Currently it supports 2 kinds of requests:
+        // 1. get all kafka partition ids for given topic
+        // 2. get all kafka partition offsets for given topic and timestamp.
+        if (request->has_kafka_meta_request()) {
+            const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
+            if (!kafka_request.partition_id_for_latest_offsets().empty()) {
+                // get latest offsets for specified partition ids
+                std::vector<PIntegerPair> partition_offsets;
+                Status st = _exec_env->routine_load_task_executor()
+                                    ->get_kafka_latest_offsets_for_partitions(
+                                            request->kafka_meta_request(), 
&partition_offsets);
+                if (st.ok()) {
+                    PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
+                    for (const auto& entry : partition_offsets) {
+                        PIntegerPair* res = part_offsets->add_offset_times();
+                        res->set_key(entry.key());
+                        res->set_val(entry.val());
+                    }
                 }
-            }
-            st.to_protobuf(response->mutable_status());
-            return;
-        } else if (!kafka_request.offset_times().empty()) {
-            // if offset_times() has elements, which means this request is to 
get offset by timestamp.
-            std::vector<PIntegerPair> partition_offsets;
-            Status st =
-                    
_exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
-                            request->kafka_meta_request(), &partition_offsets);
-            if (st.ok()) {
-                PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
-                for (const auto& entry : partition_offsets) {
-                    PIntegerPair* res = part_offsets->add_offset_times();
-                    res->set_key(entry.key());
-                    res->set_val(entry.val());
+                st.to_protobuf(response->mutable_status());
+                return;
+            } else if (!kafka_request.offset_times().empty()) {
+                // if offset_times() has elements, which means this request is 
to get offset by timestamp.
+                std::vector<PIntegerPair> partition_offsets;
+                Status st = _exec_env->routine_load_task_executor()
+                                    ->get_kafka_partition_offsets_for_times(
+                                            request->kafka_meta_request(), 
&partition_offsets);
+                if (st.ok()) {
+                    PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
+                    for (const auto& entry : partition_offsets) {
+                        PIntegerPair* res = part_offsets->add_offset_times();
+                        res->set_key(entry.key());
+                        res->set_val(entry.val());
+                    }
                 }
-            }
-            st.to_protobuf(response->mutable_status());
-            return;
-        } else {
-            // get partition ids of topic
-            std::vector<int32_t> partition_ids;
-            Status st = 
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
-                    request->kafka_meta_request(), &partition_ids);
-            if (st.ok()) {
-                PKafkaMetaProxyResult* kafka_result = 
response->mutable_kafka_meta_result();
-                for (int32_t id : partition_ids) {
-                    kafka_result->add_partition_ids(id);
+                st.to_protobuf(response->mutable_status());
+                return;
+            } else {
+                // get partition ids of topic
+                std::vector<int32_t> partition_ids;
+                Status st = 
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
+                        request->kafka_meta_request(), &partition_ids);
+                if (st.ok()) {
+                    PKafkaMetaProxyResult* kafka_result = 
response->mutable_kafka_meta_result();
+                    for (int32_t id : partition_ids) {
+                        kafka_result->add_partition_ids(id);
+                    }
                 }
+                st.to_protobuf(response->mutable_status());
+                return;
             }
-            st.to_protobuf(response->mutable_status());
-            return;
         }
-    }
-    Status::OK().to_protobuf(response->mutable_status());
+        Status::OK().to_protobuf(response->mutable_status());
+    });
 }
 
 void PInternalServiceImpl::update_cache(google::protobuf::RpcController* 
controller,
                                         const PUpdateCacheRequest* request,
                                         PCacheResponse* response, 
google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    _exec_env->result_cache()->update(request, response);
+    DorisMetrics::instance()->update_cache->increment(1);
+    _light_work_pool.offer([this, controller, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        _exec_env->result_cache()->update(request, response);
+    });
 }
 
 void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* 
controller,
                                        const PFetchCacheRequest* request, 
PFetchCacheResult* result,
                                        google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    _exec_env->result_cache()->fetch(request, result);
+    DorisMetrics::instance()->fetch_cache->increment(1);
+    _light_work_pool.offer([this, controller, request, result, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        _exec_env->result_cache()->fetch(request, result);
+    });
 }
 
 void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* 
controller,
                                        const PClearCacheRequest* request, 
PCacheResponse* response,
                                        google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    _exec_env->result_cache()->clear(request, response);
+    DorisMetrics::instance()->clear_cache->increment(1);
+    _light_work_pool.offer([this, controller, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        _exec_env->result_cache()->clear(request, response);
+    });
 }
 
 void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* 
controller,
                                         const ::doris::PMergeFilterRequest* 
request,
                                         ::doris::PMergeFilterResponse* 
response,
                                         ::google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
-    butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
-    Status st = _exec_env->fragment_mgr()->merge_filter(request, 
&zero_copy_input_stream);
-    if (!st.ok()) {
-        LOG(WARNING) << "merge meet error" << st.to_string();
-    }
-    st.to_protobuf(response->mutable_status());
+    DorisMetrics::instance()->merge_filter->increment(1);
+    _light_work_pool.offer([this, controller, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
+        butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+        Status st = _exec_env->fragment_mgr()->merge_filter(request, 
&zero_copy_input_stream);
+        if (!st.ok()) {
+            LOG(WARNING) << "merge meet error" << st.to_string();
+        }
+        st.to_protobuf(response->mutable_status());
+    });
 }
 
 void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* 
controller,
                                         const ::doris::PPublishFilterRequest* 
request,
                                         ::doris::PPublishFilterResponse* 
response,
                                         ::google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
-    butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
-    UniqueId unique_id(request->query_id());
-    VLOG_NOTICE << "rpc apply_filter recv";
-    Status st = _exec_env->fragment_mgr()->apply_filter(request, 
&zero_copy_input_stream);
-    if (!st.ok()) {
-        LOG(WARNING) << "apply filter meet error: " << st.to_string();
-    }
-    st.to_protobuf(response->mutable_status());
+    DorisMetrics::instance()->apply_filter->increment(1);
+    _light_work_pool.offer([this, controller, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
+        butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+        UniqueId unique_id(request->query_id());
+        VLOG_NOTICE << "rpc apply_filter recv";
+        Status st = _exec_env->fragment_mgr()->apply_filter(request, 
&zero_copy_input_stream);
+        if (!st.ok()) {
+            LOG(WARNING) << "apply filter meet error: " << st.to_string();
+        }
+        st.to_protobuf(response->mutable_status());
+    });
 }
 
 void PInternalServiceImpl::send_data(google::protobuf::RpcController* 
controller,
                                      const PSendDataRequest* request, 
PSendDataResult* response,
                                      google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    TUniqueId fragment_instance_id;
-    fragment_instance_id.hi = request->fragment_instance_id().hi();
-    fragment_instance_id.lo = request->fragment_instance_id().lo();
-
-    auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
-    if (pipe == nullptr) {
-        response->mutable_status()->set_status_code(1);
-        response->mutable_status()->add_error_msgs("pipe is null");
-    } else {
-        for (int i = 0; i < request->data_size(); ++i) {
-            PDataRow* row = new PDataRow();
-            row->CopyFrom(request->data(i));
-            pipe->append_and_flush(reinterpret_cast<char*>(&row), sizeof(row),
-                                   sizeof(row) + row->ByteSizeLong());
+    DorisMetrics::instance()->send_data->increment(1);
+    _heavy_work_pool.offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        TUniqueId fragment_instance_id;
+        fragment_instance_id.hi = request->fragment_instance_id().hi();
+        fragment_instance_id.lo = request->fragment_instance_id().lo();
+
+        auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+        if (pipe == nullptr) {
+            response->mutable_status()->set_status_code(1);
+            response->mutable_status()->add_error_msgs("pipe is null");
+        } else {
+            for (int i = 0; i < request->data_size(); ++i) {
+                PDataRow* row = new PDataRow();
+                row->CopyFrom(request->data(i));
+                pipe->append_and_flush(reinterpret_cast<char*>(&row), 
sizeof(row),
+                                       sizeof(row) + row->ByteSizeLong());
+            }
+            response->mutable_status()->set_status_code(0);
         }
-        response->mutable_status()->set_status_code(0);
-    }
+    });
 }
 
 void PInternalServiceImpl::commit(google::protobuf::RpcController* controller,
                                   const PCommitRequest* request, 
PCommitResult* response,
                                   google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    TUniqueId fragment_instance_id;
-    fragment_instance_id.hi = request->fragment_instance_id().hi();
-    fragment_instance_id.lo = request->fragment_instance_id().lo();
-
-    auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
-    if (pipe == nullptr) {
-        response->mutable_status()->set_status_code(1);
-        response->mutable_status()->add_error_msgs("pipe is null");
-    } else {
-        pipe->finish();
-        response->mutable_status()->set_status_code(0);
-    }
+    DorisMetrics::instance()->commit->increment(1);
+    _light_work_pool.offer([this, request, response, done]() {

Review Comment:
   warning: lambda capture 'controller' is not used 
[clang-diagnostic-unused-lambda-capture]
   
   ```suggestion
       _light_work_pool.offer([this, request, response, done]() {
   ```
   



##########
be/src/service/internal_service.cpp:
##########
@@ -668,75 +760,86 @@
                                              const PCheckRPCChannelRequest* 
request,
                                              PCheckRPCChannelResponse* 
response,
                                              google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    response->mutable_status()->set_status_code(0);
-    if (request->data().size() != request->size()) {
-        std::stringstream ss;
-        ss << "data size not same, expected: " << request->size()
-           << ", actual: " << request->data().size();
-        response->mutable_status()->add_error_msgs(ss.str());
-        response->mutable_status()->set_status_code(1);
-
-    } else {
-        Md5Digest digest;
-        digest.update(static_cast<const void*>(request->data().c_str()), 
request->data().size());
-        digest.digest();
-        if (!iequal(digest.hex(), request->md5())) {
+    DorisMetrics::instance()->check_rpc_channel->increment(1);
+    _light_work_pool.offer([controller, request, response, done]() {

Review Comment:
   warning: lambda capture 'controller' is not used 
[clang-diagnostic-unused-lambda-capture]
   
   ```suggestion
       _light_work_pool.offer([this, request, response, done]() {
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to