This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 890a8bb098 [refactor] intro pthread pool for brpc and hadoop libhdfs 
(#18513)
890a8bb098 is described below

commit 890a8bb098a38019d949e3b261929b22bdeee611
Author: Mingyu Chen <[email protected]>
AuthorDate: Fri Apr 21 22:05:29 2023 +0800

    [refactor] intro pthread pool for brpc and hadoop libhdfs (#18513)
    
    1. cherry-pick [Improvement](brpc) Using a thread pool for RPC service 
avoiding std::mutex block brpc::bthread (#16639)
    2. cherry-pick [fix](brpc) solve bthread hang problem (#17206)
    3. cherry-pick [deps](libhdfs) add official hadoop libhdfs for x86 (#17435)
---
 be/CMakeLists.txt                                  |  25 +-
 be/src/common/config.h                             |  16 +-
 be/src/io/CMakeLists.txt                           |   1 +
 be/src/{util/hdfs_util.cpp => io/fs/err_utils.cpp} |  39 +-
 be/src/{util/hdfs_util.h => io/fs/err_utils.h}     |  26 +-
 be/src/{util/hdfs_util.h => io/fs/hdfs.h}          |  27 +-
 be/src/io/hdfs_builder.cpp                         |   7 +-
 be/src/io/hdfs_builder.h                           |   8 +-
 be/src/io/hdfs_file_reader.cpp                     |  14 +-
 be/src/io/hdfs_writer.cpp                          |  21 +-
 be/src/service/internal_service.cpp                | 927 +++++++++++++--------
 be/src/service/internal_service.h                  |   9 +-
 be/src/util/blocking_priority_queue.hpp            |  12 +
 be/src/util/doris_metrics.h                        |  10 +
 be/src/util/hdfs_storage_backend.cpp               |   5 +-
 be/src/util/hdfs_storage_backend.h                 |   3 +-
 be/src/util/hdfs_util.cpp                          |   3 +-
 be/src/util/hdfs_util.h                            |   5 +-
 be/src/util/jni-util.cpp                           |  97 +--
 be/src/util/jni-util.h                             |   5 +
 be/src/util/libjvm_loader.cpp                      |   9 +
 be/src/util/priority_thread_pool.hpp               |  11 +-
 bin/start_be.sh                                    |  54 +-
 build.sh                                           |   4 +
 conf/be.conf                                       |   4 +-
 .../maint-monitor/monitor-metrics/metrics.md       |   6 +
 .../org/apache/doris/clone/BeLoadRebalancer.java   |   2 +-
 gensrc/proto/internal_service.proto                |   1 +
 28 files changed, 842 insertions(+), 509 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 114e7be057..b3ad886910 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -398,9 +398,6 @@ set_target_properties(k5crypto PROPERTIES IMPORTED_LOCATION 
${THIRDPARTY_DIR}/li
 add_library(gssapi_krb5 STATIC IMPORTED)
 set_target_properties(gssapi_krb5 PROPERTIES IMPORTED_LOCATION 
${THIRDPARTY_DIR}/lib/libgssapi_krb5.a)
 
-add_library(hdfs3 STATIC IMPORTED)
-set_target_properties(hdfs3 PROPERTIES IMPORTED_LOCATION 
${THIRDPARTY_DIR}/lib/libhdfs3.a)
-
 find_program(THRIFT_COMPILER thrift ${CMAKE_SOURCE_DIR}/bin)
 
 if (OS_MACOSX)
@@ -719,12 +716,32 @@ set(COMMON_THIRDPARTY
     # put this after lz4 to avoid using lz4 lib in librdkafka
     librdkafka_cpp
     librdkafka
-    hdfs3
     xml2
     lzma
     simdjson
 )
 
+if (ARCH_AMD64 AND OS_LINUX)
+    add_library(hadoop_hdfs STATIC IMPORTED)
+    set_target_properties(hadoop_hdfs PROPERTIES IMPORTED_LOCATION 
${THIRDPARTY_DIR}/lib/hadoop_hdfs/native/libhdfs.a)
+
+    set(COMMON_THIRDPARTY
+        ${COMMON_THIRDPARTY}
+        hadoop_hdfs
+    )
+    add_definitions(-DUSE_HADOOP_HDFS)
+else()
+    add_library(hdfs3 STATIC IMPORTED)
+    set_target_properties(hdfs3 PROPERTIES IMPORTED_LOCATION 
${THIRDPARTY_DIR}/lib/libhdfs3.a)
+
+    # TODO: use arm hadoop hdfs to replace this
+    set(COMMON_THIRDPARTY
+        ${COMMON_THIRDPARTY}
+        hdfs3
+    )
+    add_definitions(-DUSE_LIBHDFS3)
+endif()
+
 if (OS_MACOSX)
     set(COMMON_THIRDPARTY
         ${COMMON_THIRDPARTY}
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 94cad91159..91e77f0a9c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -35,7 +35,8 @@ CONF_Int32(be_port, "9060");
 // port for brpc
 CONF_Int32(brpc_port, "8060");
 
-// the number of bthreads for brpc, the default value is set to -1, which 
means the number of bthreads is #cpu-cores
+// the number of bthreads for brpc, the default value is set to -1,
+// which means the number of bthreads is #cpu-cores
 CONF_Int32(brpc_num_threads, "-1");
 
 // port to brpc server for single replica load
@@ -395,8 +396,15 @@ CONF_Int32(single_replica_load_download_num_workers, "64");
 CONF_Int64(load_data_reserve_hours, "4");
 // log error log will be removed after this time
 CONF_mInt64(load_error_log_reserve_hours, "48");
-CONF_Int32(number_tablet_writer_threads, "16");
-CONF_Int32(number_slave_replica_download_threads, "64");
+
+// be brpc interface is classified into two categories: light and heavy
+// each category has diffrent thread number
+// threads to handle heavy api interface, such as transmit_data/transmit_block 
etc
+CONF_Int32(brpc_heavy_work_pool_threads, "192");
+// threads to handle light api interface, such as 
exec_plan_fragment_prepare/exec_plan_fragment_start
+CONF_Int32(brpc_light_work_pool_threads, "32");
+CONF_Int32(brpc_heavy_work_pool_max_queue_size, "10240");
+CONF_Int32(brpc_light_work_pool_max_queue_size, "10240");
 
 // The maximum amount of data that can be processed by a stream load
 CONF_mInt64(streaming_load_max_mb, "10240");
@@ -898,8 +906,6 @@ CONF_Int32(segcompaction_threshold_segment_num, "10");
 // The segment whose row number above the threshold will be compacted during 
segcompaction
 CONF_Int32(segcompaction_small_threshold, "1048576");
 
-CONF_String(jvm_max_heap_size, "1024M");
-
 // enable java udf and jdbc scannode
 CONF_Bool(enable_java_support, "true");
 
diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt
index 4096d2557b..2a36d4a3c2 100644
--- a/be/src/io/CMakeLists.txt
+++ b/be/src/io/CMakeLists.txt
@@ -34,6 +34,7 @@ set(IO_FILES
     local_file_writer.cpp
     s3_reader.cpp
     s3_writer.cpp
+    fs/err_utils.cpp
     fs/file_system_map.cpp
     fs/local_file_reader.cpp
     fs/local_file_system.cpp
diff --git a/be/src/util/hdfs_util.cpp b/be/src/io/fs/err_utils.cpp
similarity index 53%
copy from be/src/util/hdfs_util.cpp
copy to be/src/io/fs/err_utils.cpp
index b58fc75e46..d08ea1bee4 100644
--- a/be/src/util/hdfs_util.cpp
+++ b/be/src/io/fs/err_utils.cpp
@@ -14,29 +14,40 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+#include "io/fs/err_utils.h"
 
-#include "util/hdfs_util.h"
+#include <fmt/format.h>
 
-#include <util/string_util.h>
+#include <sstream>
 
-#include "common/config.h"
-#include "common/logging.h"
+#include "io/fs/hdfs.h"
 
 namespace doris {
+namespace io {
 
-HDFSHandle& HDFSHandle::instance() {
-    static HDFSHandle hdfs_handle;
-    return hdfs_handle;
+std::string errno_to_str() {
+    char buf[1024];
+    return fmt::format("({}), {}", errno, strerror_r(errno, buf, 1024));
 }
 
-hdfsFS HDFSHandle::create_hdfs_fs(HDFSCommonBuilder& hdfs_builder) {
-    hdfsFS hdfs_fs = hdfsBuilderConnect(hdfs_builder.get());
-    if (hdfs_fs == nullptr) {
-        LOG(WARNING) << "connect to hdfs failed."
-                     << ", error: " << hdfsGetLastError();
-        return nullptr;
+std::string errcode_to_str(const std::error_code& ec) {
+    return fmt::format("({}), {}", ec.value(), ec.message());
+}
+
+std::string hdfs_error() {
+    std::stringstream ss;
+    char buf[1024];
+    ss << "(" << errno << "), " << strerror_r(errno, buf, 1024) << ")";
+#ifdef USE_HADOOP_HDFS
+    char* root_cause = hdfsGetLastExceptionRootCause();
+    if (root_cause != nullptr) {
+        ss << ", reason: " << root_cause;
     }
-    return hdfs_fs;
+#else
+    ss << ", reason: " << hdfsGetLastError();
+#endif
+    return ss.str();
 }
 
+} // namespace io
 } // namespace doris
diff --git a/be/src/util/hdfs_util.h b/be/src/io/fs/err_utils.h
similarity index 73%
copy from be/src/util/hdfs_util.h
copy to be/src/io/fs/err_utils.h
index f7bfc14b3a..31ca702c32 100644
--- a/be/src/util/hdfs_util.h
+++ b/be/src/io/fs/err_utils.h
@@ -17,27 +17,15 @@
 
 #pragma once
 
-#include <hdfs/hdfs.h>
-
-#include <map>
-#include <memory>
 #include <string>
-
-#include "common/status.h"
-#include "io/hdfs_builder.h"
+#include <system_error>
 
 namespace doris {
+namespace io {
 
-class HDFSHandle {
-public:
-    ~HDFSHandle() {}
-
-    static HDFSHandle& instance();
-
-    hdfsFS create_hdfs_fs(HDFSCommonBuilder& builder);
-
-private:
-    HDFSHandle() {}
-};
+std::string errno_to_str();
+std::string errcode_to_str(const std::error_code& ec);
+std::string hdfs_error();
 
-} // namespace doris
\ No newline at end of file
+} // namespace io
+} // namespace doris
diff --git a/be/src/util/hdfs_util.h b/be/src/io/fs/hdfs.h
similarity index 72%
copy from be/src/util/hdfs_util.h
copy to be/src/io/fs/hdfs.h
index f7bfc14b3a..eb9e1b2c07 100644
--- a/be/src/util/hdfs_util.h
+++ b/be/src/io/fs/hdfs.h
@@ -17,27 +17,8 @@
 
 #pragma once
 
+#ifdef USE_HADOOP_HDFS
+#include <hadoop_hdfs/hdfs.h>
+#else
 #include <hdfs/hdfs.h>
-
-#include <map>
-#include <memory>
-#include <string>
-
-#include "common/status.h"
-#include "io/hdfs_builder.h"
-
-namespace doris {
-
-class HDFSHandle {
-public:
-    ~HDFSHandle() {}
-
-    static HDFSHandle& instance();
-
-    hdfsFS create_hdfs_fs(HDFSCommonBuilder& builder);
-
-private:
-    HDFSHandle() {}
-};
-
-} // namespace doris
\ No newline at end of file
+#endif
diff --git a/be/src/io/hdfs_builder.cpp b/be/src/io/hdfs_builder.cpp
index c82af8e299..c971173c91 100644
--- a/be/src/io/hdfs_builder.cpp
+++ b/be/src/io/hdfs_builder.cpp
@@ -18,6 +18,7 @@
 #include "io/hdfs_builder.h"
 
 #include <fmt/format.h>
+#include <thrift/protocol/TDebugProtocol.h>
 
 #include <fstream>
 
@@ -26,6 +27,7 @@
 #include "util/string_util.h"
 #include "util/uid_util.h"
 #include "util/url_coding.h"
+
 namespace doris {
 
 Status HDFSCommonBuilder::init_hdfs_builder() {
@@ -36,6 +38,7 @@ Status HDFSCommonBuilder::init_hdfs_builder() {
         return Status::InternalError(
                 "failed to init HDFSCommonBuilder, please check 
be/conf/hdfs-site.xml and be.out");
     }
+    hdfsBuilderSetForceNewInstance(hdfs_builder);
     return Status::OK();
 }
 
@@ -54,7 +57,10 @@ Status HDFSCommonBuilder::run_kinit() {
     if (!rc) {
         return Status::InternalError("Kinit failed, errMsg: " + msg);
     }
+#ifdef USE_LIBHDFS3
+    hdfsBuilderSetPrincipal(hdfs_builder, hdfs_kerberos_principal.c_str());
     hdfsBuilderSetKerbTicketCachePath(hdfs_builder, ticket_path.c_str());
+#endif
     return Status::OK();
 }
 
@@ -97,7 +103,6 @@ Status createHDFSBuilder(const THdfsParams& hdfsParams, 
HDFSCommonBuilder* build
     if (hdfsParams.__isset.hdfs_kerberos_principal) {
         builder->need_kinit = true;
         builder->hdfs_kerberos_principal = hdfsParams.hdfs_kerberos_principal;
-        hdfsBuilderSetPrincipal(builder->get(), 
hdfsParams.hdfs_kerberos_principal.c_str());
     }
     if (hdfsParams.__isset.hdfs_kerberos_keytab) {
         builder->need_kinit = true;
diff --git a/be/src/io/hdfs_builder.h b/be/src/io/hdfs_builder.h
index eb63fab1b5..26164ce6ed 100644
--- a/be/src/io/hdfs_builder.h
+++ b/be/src/io/hdfs_builder.h
@@ -17,10 +17,9 @@
 
 #pragma once
 
-#include <hdfs/hdfs.h>
-
 #include "gen_cpp/PlanNodes_types.h"
 #include "io/file_reader.h"
+#include "io/fs/hdfs.h"
 
 namespace doris {
 
@@ -38,9 +37,12 @@ class HDFSCommonBuilder {
 public:
     HDFSCommonBuilder() = default;
     ~HDFSCommonBuilder() {
+#if defined(USE_LIBHDFS3) || defined(BE_TEST)
+        // for hadoop hdfs, the hdfs_builder will be freed in hdfsConnect
         if (hdfs_builder != nullptr) {
             hdfsFreeBuilder(hdfs_builder);
         }
+#endif
     }
 
     // Must call this to init hdfs_builder first.
@@ -51,7 +53,7 @@ public:
     Status run_kinit();
 
 private:
-    hdfsBuilder* hdfs_builder;
+    hdfsBuilder* hdfs_builder = nullptr;
     bool need_kinit {false};
     std::string hdfs_kerberos_keytab;
     std::string hdfs_kerberos_principal;
diff --git a/be/src/io/hdfs_file_reader.cpp b/be/src/io/hdfs_file_reader.cpp
index 7117ec0e7c..7e9394d707 100644
--- a/be/src/io/hdfs_file_reader.cpp
+++ b/be/src/io/hdfs_file_reader.cpp
@@ -14,11 +14,13 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+
 #include "io/hdfs_file_reader.h"
 
 #include <sys/stat.h>
 #include <unistd.h>
 
+#include "io/fs/err_utils.h"
 #include "service/backend_options.h"
 
 namespace doris {
@@ -100,12 +102,12 @@ Status HdfsFileReader::open() {
             if (_hdfs_fs == nullptr) {
                 return Status::InternalError(
                         "open file failed. (BE: {}) namenode:{}, path:{}, err: 
{}",
-                        BackendOptions::get_localhost(), _namenode, _path, 
hdfsGetLastError());
+                        BackendOptions::get_localhost(), _namenode, _path, 
io::hdfs_error());
             }
         } else {
             return Status::InternalError("open file failed. (BE: {}) 
namenode:{}, path:{}, err: {}",
                                          BackendOptions::get_localhost(), 
_namenode, _path,
-                                         hdfsGetLastError());
+                                         io::hdfs_error());
         }
     }
     VLOG_NOTICE << "open file, namenode:" << _namenode << ", path:" << _path;
@@ -174,7 +176,7 @@ Status HdfsFileReader::readat(int64_t position, int64_t 
nbytes, int64_t* bytes_r
         if (loop_read < 0) {
             return Status::InternalError(
                     "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, 
err: {}",
-                    BackendOptions::get_localhost(), _namenode, _path, 
hdfsGetLastError());
+                    BackendOptions::get_localhost(), _namenode, _path, 
io::hdfs_error());
         }
         if (loop_read == 0) {
             break;
@@ -192,7 +194,7 @@ int64_t HdfsFileReader::size() {
             hdfsFileInfo* file_info = hdfsGetPathInfo(_hdfs_fs, _path.c_str());
             if (file_info == nullptr) {
                 return Status::IOError("failed to get path info, path: {}, 
error: {}", _path,
-                                       hdfsGetLastError());
+                                       io::hdfs_error());
             }
             _file_size = file_info->mSize;
             hdfsFreeFileInfo(file_info, 1);
@@ -205,7 +207,7 @@ Status HdfsFileReader::seek(int64_t position) {
     int res = hdfsSeek(_hdfs_fs, _hdfs_file, position);
     if (res != 0) {
         return Status::InternalError("Seek to offset failed. (BE: {}) 
offset={}, err: {}",
-                                     BackendOptions::get_localhost(), 
position, hdfsGetLastError());
+                                     BackendOptions::get_localhost(), 
position, io::hdfs_error());
     }
     _current_offset = position;
     return Status::OK();
@@ -223,7 +225,7 @@ Status HdfsFsCache::_create_fs(THdfsParams& hdfs_params, 
hdfsFS* fs) {
     RETURN_IF_ERROR(createHDFSBuilder(hdfs_params, &builder));
     hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get());
     if (hdfs_fs == nullptr) {
-        return Status::InternalError("connect to hdfs failed. error: {}", 
hdfsGetLastError());
+        return Status::InternalError("connect to hdfs failed. error: {}", 
io::hdfs_error());
     }
     *fs = hdfs_fs;
     return Status::OK();
diff --git a/be/src/io/hdfs_writer.cpp b/be/src/io/hdfs_writer.cpp
index e6814438f1..69554d82a9 100644
--- a/be/src/io/hdfs_writer.cpp
+++ b/be/src/io/hdfs_writer.cpp
@@ -17,10 +17,14 @@
 
 #include "io/hdfs_writer.h"
 
+#include <bthread/bthread.h>
+
 #include <filesystem>
 
 #include "common/logging.h"
+#include "io/fs/err_utils.h"
 #include "service/backend_options.h"
+#include "util/stack_util.h"
 
 namespace doris {
 
@@ -56,16 +60,18 @@ Status HDFSWriter::open() {
 
     std::filesystem::path hdfs_path(_path);
     std::string hdfs_dir = hdfs_path.parent_path().string();
+    LOG(INFO) << "hdfs write open: " << hdfs_dir << get_stack_trace();
     exists = hdfsExists(_hdfs_fs, hdfs_dir.c_str());
     if (exists != 0) {
-        VLOG_NOTICE << "hdfs dir doesn't exist, create it: " << hdfs_dir;
+        LOG(INFO) << "hdfs dir doesn't exist, create it: " << hdfs_dir << ", 
path: " << _path
+                  << get_stack_trace();
         int ret = hdfsCreateDirectory(_hdfs_fs, hdfs_dir.c_str());
         if (ret != 0) {
             std::stringstream ss;
             ss << "create dir failed. "
                << "(BE: " << BackendOptions::get_localhost() << ")"
                << " namenode: " << _namenode << " path: " << hdfs_dir
-               << ", err: " << hdfsGetLastError();
+               << ", err: " << io::hdfs_error();
             LOG(WARNING) << ss.str();
             return Status::InternalError(ss.str());
         }
@@ -76,7 +82,7 @@ Status HDFSWriter::open() {
         std::stringstream ss;
         ss << "open file failed. "
            << "(BE: " << BackendOptions::get_localhost() << ")"
-           << " namenode:" << _namenode << " path:" << _path << ", err: " << 
hdfsGetLastError();
+           << " namenode:" << _namenode << " path:" << _path << ", err: " << 
io::hdfs_error();
         LOG(WARNING) << ss.str();
         return Status::InternalError(ss.str());
     }
@@ -94,7 +100,7 @@ Status HDFSWriter::write(const uint8_t* buf, size_t buf_len, 
size_t* written_len
         std::stringstream ss;
         ss << "write file failed. "
            << "(BE: " << BackendOptions::get_localhost() << ")"
-           << "namenode:" << _namenode << " path:" << _path << ", err: " << 
hdfsGetLastError();
+           << "namenode:" << _namenode << " path:" << _path << ", err: " << 
io::hdfs_error();
         LOG(WARNING) << ss.str();
         return Status::InternalError(ss.str());
     }
@@ -121,7 +127,7 @@ Status HDFSWriter::close() {
         std::stringstream ss;
         ss << "failed to flush hdfs file. "
            << "(BE: " << BackendOptions::get_localhost() << ")"
-           << "namenode:" << _namenode << " path:" << _path << ", err: " << 
hdfsGetLastError();
+           << "namenode:" << _namenode << " path:" << _path << ", err: " << 
io::hdfs_error();
         LOG(WARNING) << ss.str();
         return Status::InternalError(ss.str());
     }
@@ -135,11 +141,12 @@ Status HDFSWriter::close() {
 
 Status HDFSWriter::_connect() {
     HDFSCommonBuilder builder;
-    RETURN_IF_ERROR(createHDFSBuilder(_properties, &builder));
+    THdfsParams hdfsParams = parse_properties(_properties);
+    RETURN_IF_ERROR(createHDFSBuilder(hdfsParams, &builder));
     _hdfs_fs = hdfsBuilderConnect(builder.get());
     if (_hdfs_fs == nullptr) {
         return Status::InternalError("connect to hdfs failed. namenode 
address:{}, error {}",
-                                     _namenode, hdfsGetLastError());
+                                     _namenode, io::hdfs_error());
     }
     return Status::OK();
 }
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 1f658da09c..6db5a55668 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -61,7 +61,15 @@ using namespace ErrorCode;
 
 const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3;
 
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(add_batch_task_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_active_threads, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_active_threads, 
MetricUnit::NOUNIT);
+
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_max_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT);
 
 bthread_key_t btls_key;
 
@@ -95,16 +103,42 @@ private:
 
 PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
         : _exec_env(exec_env),
-          _tablet_worker_pool(config::number_tablet_writer_threads, 10240, 
"tablet_writer"),
-          
_slave_replica_worker_pool(config::number_slave_replica_download_threads, 10240,
-                                     "replica_download") {
-    REGISTER_HOOK_METRIC(add_batch_task_queue_size,
-                         [this]() { return 
_tablet_worker_pool.get_queue_size(); });
+          _heavy_work_pool(config::brpc_heavy_work_pool_threads,
+                           config::brpc_heavy_work_pool_max_queue_size, 
"brpc_heavy"),
+          _light_work_pool(config::brpc_light_work_pool_threads,
+                           config::brpc_light_work_pool_max_queue_size, 
"brpc_light") {
+    REGISTER_HOOK_METRIC(heavy_work_pool_queue_size,
+                         [this]() { return _heavy_work_pool.get_queue_size(); 
});
+    REGISTER_HOOK_METRIC(light_work_pool_queue_size,
+                         [this]() { return _light_work_pool.get_queue_size(); 
});
+    REGISTER_HOOK_METRIC(heavy_work_active_threads,
+                         [this]() { return 
_heavy_work_pool.get_active_threads(); });
+    REGISTER_HOOK_METRIC(light_work_active_threads,
+                         [this]() { return 
_light_work_pool.get_active_threads(); });
+
+    REGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size,
+                         []() { return 
config::brpc_heavy_work_pool_max_queue_size; });
+    REGISTER_HOOK_METRIC(light_work_pool_max_queue_size,
+                         []() { return 
config::brpc_light_work_pool_max_queue_size; });
+    REGISTER_HOOK_METRIC(heavy_work_max_threads,
+                         []() { return config::brpc_heavy_work_pool_threads; 
});
+    REGISTER_HOOK_METRIC(light_work_max_threads,
+                         []() { return config::brpc_light_work_pool_threads; 
});
+
     CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter));
 }
 
 PInternalServiceImpl::~PInternalServiceImpl() {
-    DEREGISTER_HOOK_METRIC(add_batch_task_queue_size);
+    DEREGISTER_HOOK_METRIC(heavy_work_pool_queue_size);
+    DEREGISTER_HOOK_METRIC(light_work_pool_queue_size);
+    DEREGISTER_HOOK_METRIC(heavy_work_active_threads);
+    DEREGISTER_HOOK_METRIC(light_work_active_threads);
+
+    DEREGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size);
+    DEREGISTER_HOOK_METRIC(light_work_pool_max_queue_size);
+    DEREGISTER_HOOK_METRIC(heavy_work_max_threads);
+    DEREGISTER_HOOK_METRIC(light_work_max_threads);
+
     CHECK_EQ(0, bthread_key_delete(btls_key));
 }
 
@@ -132,7 +166,7 @@ void 
PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController
     _transmit_data(cntl_base, new_request, response, new_done, st);
 }
 
-void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* 
controller,
                                           const PTransmitDataParams* request,
                                           PTransmitDataResult* response,
                                           google::protobuf::Closure* done,
@@ -170,22 +204,31 @@ void 
PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c
                                               const PTabletWriterOpenRequest* 
request,
                                               PTabletWriterOpenResult* 
response,
                                               google::protobuf::Closure* done) 
{
-    VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << 
request->index_id()
-             << ", txn_id=" << request->txn_id();
-    brpc::ClosureGuard closure_guard(done);
-    auto st = _exec_env->load_channel_mgr()->open(*request);
-    if (!st.ok()) {
-        LOG(WARNING) << "load channel open failed, message=" << st << ", id=" 
<< request->id()
-                     << ", index_id=" << request->index_id() << ", txn_id=" << 
request->txn_id();
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        VLOG_RPC << "tablet writer open, id=" << request->id()
+                 << ", index_id=" << request->index_id() << ", txn_id=" << 
request->txn_id();
+        brpc::ClosureGuard closure_guard(done);
+        auto st = _exec_env->load_channel_mgr()->open(*request);
+        if (!st.ok()) {
+            LOG(WARNING) << "load channel open failed, message=" << st << ", 
id=" << request->id()
+                         << ", index_id=" << request->index_id()
+                         << ", txn_id=" << request->txn_id();
+        }
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
-    st.to_protobuf(response->mutable_status());
 }
 
-void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* 
controller,
                                               const PExecPlanFragmentRequest* 
request,
                                               PExecPlanFragmentResult* 
response,
                                               google::protobuf::Closure* done) 
{
-    auto span = telemetry::start_rpc_server_span("exec_plan_fragment", 
cntl_base);
+    auto span = telemetry::start_rpc_server_span("exec_plan_fragment", 
controller);
     auto scope = OpentelemetryScope {span};
     brpc::ClosureGuard closure_guard(done);
     auto st = Status::OK();
@@ -199,67 +242,95 @@ void 
PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c
     st.to_protobuf(response->mutable_status());
 }
 
-void 
PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController*
 cntl_base,
+void 
PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController*
 controller,
                                                       const 
PExecPlanFragmentRequest* request,
                                                       PExecPlanFragmentResult* 
response,
                                                       
google::protobuf::Closure* done) {
-    exec_plan_fragment(cntl_base, request, response, done);
+    bool ret = _light_work_pool.try_offer([this, controller, request, 
response, done]() {
+        exec_plan_fragment(controller, request, response, done);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
 }
 
 void 
PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcController* 
controller,
                                                     const 
PExecPlanFragmentStartRequest* request,
                                                     PExecPlanFragmentResult* 
result,
                                                     google::protobuf::Closure* 
done) {
-    auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", 
controller);
-    auto scope = OpentelemetryScope {span};
-    brpc::ClosureGuard closure_guard(done);
-    auto st = _exec_env->fragment_mgr()->start_query_execution(request);
-    st.to_protobuf(result->mutable_status());
+    bool ret = _light_work_pool.try_offer([this, controller, request, result, 
done]() {
+        auto span = 
telemetry::start_rpc_server_span("exec_plan_fragment_start", controller);
+        auto scope = OpentelemetryScope {span};
+        brpc::ClosureGuard closure_guard(done);
+        auto st = _exec_env->fragment_mgr()->start_query_execution(request);
+        st.to_protobuf(result->mutable_status());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        result->mutable_status()->add_error_msgs("fail to offer request to the 
work pool");
+    }
 }
 
-void 
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* 
cntl_base,
+void 
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* 
controller,
                                                    const 
PTabletWriterAddBlockRequest* request,
                                                    
PTabletWriterAddBlockResult* response,
                                                    google::protobuf::Closure* 
done) {
-    // TODO(zxy) delete in 1.2 version
-    google::protobuf::Closure* new_done = new 
NewHttpClosure<PTransmitDataParams>(done);
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, 
cntl);
+    bool ret = _heavy_work_pool.try_offer([this, controller, request, 
response, done]() {
+        // TODO(zxy) delete in 1.2 version
+        google::protobuf::Closure* new_done = new 
NewHttpClosure<PTransmitDataParams>(done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+        
attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl);
 
-    _tablet_writer_add_block(cntl_base, request, response, new_done);
+        _tablet_writer_add_block(controller, request, response, new_done);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
 }
 
 void PInternalServiceImpl::tablet_writer_add_block_by_http(
-        google::protobuf::RpcController* cntl_base, const 
::doris::PEmptyRequest* request,
+        google::protobuf::RpcController* controller, const 
::doris::PEmptyRequest* request,
         PTabletWriterAddBlockResult* response, google::protobuf::Closure* 
done) {
-    PTabletWriterAddBlockRequest* new_request = new 
PTabletWriterAddBlockRequest();
-    google::protobuf::Closure* new_done =
-            new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request, 
done);
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    Status st = 
attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(new_request,
-                                                                               
        cntl);
-    if (st.ok()) {
-        _tablet_writer_add_block(cntl_base, new_request, response, new_done);
-    } else {
-        st.to_protobuf(response->mutable_status());
+    bool ret = _heavy_work_pool.try_offer([this, controller, response, done]() 
{
+        PTabletWriterAddBlockRequest* new_request = new 
PTabletWriterAddBlockRequest();
+        google::protobuf::Closure* new_done =
+                new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request, 
done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+        Status st = 
attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(
+                new_request, cntl);
+        if (st.ok()) {
+            _tablet_writer_add_block(controller, new_request, response, 
new_done);
+        } else {
+            st.to_protobuf(response->mutable_status());
+        }
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
 }
 
-void 
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* 
cntl_base,
+void 
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* 
controller,
                                                     const 
PTabletWriterAddBlockRequest* request,
                                                     
PTabletWriterAddBlockResult* response,
                                                     google::protobuf::Closure* 
done) {
-    VLOG_RPC << "tablet writer add block, id=" << request->id()
-             << ", index_id=" << request->index_id() << ", sender_id=" << 
request->sender_id()
-             << ", current_queued_size=" << 
_tablet_worker_pool.get_queue_size();
     int64_t submit_task_time_ns = MonotonicNanos();
-    _tablet_worker_pool.offer([request, response, done, submit_task_time_ns, 
this]() {
+    bool ret = _heavy_work_pool.try_offer([request, response, done, 
submit_task_time_ns, this]() {
         int64_t wait_execution_time_ns = MonotonicNanos() - 
submit_task_time_ns;
         brpc::ClosureGuard closure_guard(done);
         int64_t execution_time_ns = 0;
         {
             SCOPED_RAW_TIMER(&execution_time_ns);
-
             auto st = _exec_env->load_channel_mgr()->add_batch(*request, 
response);
             if (!st.ok()) {
                 LOG(WARNING) << "tablet writer add block failed, message=" << 
st
@@ -272,6 +343,12 @@ void 
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl
         response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO);
         response->set_wait_execution_time_us(wait_execution_time_ns / 
NANOS_PER_MICRO);
     });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
 }
 
 void 
PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcController* 
cntl_base,
@@ -303,13 +380,13 @@ void 
PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl
                                                     
PTabletWriterAddBatchResult* response,
                                                     google::protobuf::Closure* 
done) {
     VLOG_RPC << "tablet writer add batch, id=" << request->id()
-             << ", index_id=" << request->index_id() << ", sender_id=" << 
request->sender_id()
-             << ", current_queued_size=" << 
_tablet_worker_pool.get_queue_size();
+             << ", index_id=" << request->index_id() << ", sender_id=" << 
request->sender_id();
     // add batch maybe cost a lot of time, and this callback thread will be 
held.
     // this will influence query execution, because the pthreads under bthread 
may be
     // exhausted, so we put this to a local thread pool to process
     int64_t submit_task_time_ns = MonotonicNanos();
-    _tablet_worker_pool.offer([cntl_base, request, response, done, 
submit_task_time_ns, this]() {
+    bool ret = _heavy_work_pool.offer([cntl_base, request, response, done, 
submit_task_time_ns,
+                                       this]() {
         int64_t wait_execution_time_ns = MonotonicNanos() - 
submit_task_time_ns;
         brpc::ClosureGuard closure_guard(done);
         int64_t execution_time_ns = 0;
@@ -332,20 +409,32 @@ void 
PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl
         response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO);
         response->set_wait_execution_time_us(wait_execution_time_ns / 
NANOS_PER_MICRO);
     });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
 }
 
 void 
PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* 
controller,
                                                 const 
PTabletWriterCancelRequest* request,
                                                 PTabletWriterCancelResult* 
response,
                                                 google::protobuf::Closure* 
done) {
-    VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" 
<< request->index_id()
-             << ", sender_id=" << request->sender_id();
-    brpc::ClosureGuard closure_guard(done);
-    auto st = _exec_env->load_channel_mgr()->cancel(*request);
-    if (!st.ok()) {
-        LOG(WARNING) << "tablet writer cancel failed, id=" << request->id()
-                     << ", index_id=" << request->index_id()
-                     << ", sender_id=" << request->sender_id();
+    bool ret = _light_work_pool.try_offer([this, request, done]() {
+        VLOG_RPC << "tablet writer cancel, id=" << request->id()
+                 << ", index_id=" << request->index_id() << ", sender_id=" << 
request->sender_id();
+        brpc::ClosureGuard closure_guard(done);
+        auto st = _exec_env->load_channel_mgr()->cancel(*request);
+        if (!st.ok()) {
+            LOG(WARNING) << "tablet writer cancel failed, id=" << request->id()
+                         << ", index_id=" << request->index_id()
+                         << ", sender_id=" << request->sender_id();
+        }
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
     }
 }
 
@@ -377,313 +466,408 @@ Status PInternalServiceImpl::_exec_plan_fragment(const 
std::string& ser_request,
     }
 }
 
-void 
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* 
cntl_base,
+void 
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* 
controller,
                                                 const 
PCancelPlanFragmentRequest* request,
                                                 PCancelPlanFragmentResult* 
result,
                                                 google::protobuf::Closure* 
done) {
-    auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", 
cntl_base);
-    auto scope = OpentelemetryScope {span};
-    brpc::ClosureGuard closure_guard(done);
-    TUniqueId tid;
-    tid.__set_hi(request->finst_id().hi());
-    tid.__set_lo(request->finst_id().lo());
-
-    Status st;
-    if (request->has_cancel_reason()) {
-        LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid)
-                  << ", reason: " << request->cancel_reason();
-        st = _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
-    } else {
-        LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid);
-        st = _exec_env->fragment_mgr()->cancel(tid);
-    }
-    if (!st.ok()) {
-        LOG(WARNING) << "cancel plan fragment failed, errmsg=" << st;
+    bool ret = _light_work_pool.try_offer([this, controller, request, result, 
done]() {
+        auto span = 
telemetry::start_rpc_server_span("exec_plan_fragment_start", controller);
+        auto scope = OpentelemetryScope {span};
+        brpc::ClosureGuard closure_guard(done);
+        TUniqueId tid;
+        tid.__set_hi(request->finst_id().hi());
+        tid.__set_lo(request->finst_id().lo());
+
+        Status st = Status::OK();
+        if (request->has_cancel_reason()) {
+            LOG(INFO) << "cancel fragment, fragment_instance_id=" << 
print_id(tid)
+                      << ", reason: " << request->cancel_reason();
+            _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
+        } else {
+            LOG(INFO) << "cancel fragment, fragment_instance_id=" << 
print_id(tid);
+            _exec_env->fragment_mgr()->cancel(tid);
+        }
+        // TODO: the logic seems useless, cancel only return Status::OK. 
remove it
+        st.to_protobuf(result->mutable_status());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        result->mutable_status()->add_error_msgs("fail to offer request to the 
work pool");
     }
-    st.to_protobuf(result->mutable_status());
 }
 
-void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* 
controller,
                                       const PFetchDataRequest* request, 
PFetchDataResult* result,
                                       google::protobuf::Closure* done) {
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
-    _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
+    bool ret = _heavy_work_pool.try_offer([this, controller, request, result, 
done]() {
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+        GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
+        _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        result->mutable_status()->add_error_msgs("fail to offer request to the 
work pool");
+    }
 }
 
 void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* 
controller,
                                               const PFetchTableSchemaRequest* 
request,
                                               PFetchTableSchemaResult* result,
                                               google::protobuf::Closure* done) 
{
-    VLOG_RPC << "fetch table schema";
-    brpc::ClosureGuard closure_guard(done);
-    TFileScanRange file_scan_range;
-    Status st = Status::OK();
-    {
-        const uint8_t* buf = (const 
uint8_t*)(request->file_scan_range().data());
-        uint32_t len = request->file_scan_range().size();
-        st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
+    bool ret = _heavy_work_pool.try_offer([request, result, done]() {
+        VLOG_RPC << "fetch table schema";
+        brpc::ClosureGuard closure_guard(done);
+        TFileScanRange file_scan_range;
+        Status st = Status::OK();
+        {
+            const uint8_t* buf = (const 
uint8_t*)(request->file_scan_range().data());
+            uint32_t len = request->file_scan_range().size();
+            st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
+            if (!st.ok()) {
+                LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
+                st.to_protobuf(result->mutable_status());
+                return;
+            }
+        }
+        if (file_scan_range.__isset.ranges == false) {
+            st = Status::InternalError("can not get TFileRangeDesc.");
+            st.to_protobuf(result->mutable_status());
+            return;
+        }
+        if (file_scan_range.__isset.params == false) {
+            st = Status::InternalError("can not get TFileScanRangeParams.");
+            st.to_protobuf(result->mutable_status());
+            return;
+        }
+        const TFileRangeDesc& range = file_scan_range.ranges.at(0);
+        const TFileScanRangeParams& params = file_scan_range.params;
+
+        std::unique_ptr<vectorized::GenericReader> reader(nullptr);
+        std::unique_ptr<RuntimeProfile> profile(new 
RuntimeProfile("FetchTableSchema"));
+        switch (params.format_type) {
+        case TFileFormatType::FORMAT_CSV_PLAIN:
+        case TFileFormatType::FORMAT_CSV_GZ:
+        case TFileFormatType::FORMAT_CSV_BZ2:
+        case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+        case TFileFormatType::FORMAT_CSV_LZOP:
+        case TFileFormatType::FORMAT_CSV_DEFLATE: {
+            // file_slots is no use
+            std::vector<SlotDescriptor*> file_slots;
+            reader.reset(new vectorized::CsvReader(profile.get(), params, 
range, file_slots));
+            break;
+        }
+        case TFileFormatType::FORMAT_PARQUET: {
+            reader.reset(new vectorized::ParquetReader(params, range));
+            break;
+        }
+        case TFileFormatType::FORMAT_ORC: {
+            std::vector<std::string> column_names;
+            reader.reset(new vectorized::OrcReader(params, range, 
column_names, ""));
+            break;
+        }
+        case TFileFormatType::FORMAT_JSON: {
+            std::vector<SlotDescriptor*> file_slots;
+            reader.reset(new vectorized::NewJsonReader(profile.get(), params, 
range, file_slots));
+            break;
+        }
+        default:
+            st = Status::InternalError("Not supported file format in fetch 
table schema: {}",
+                                       params.format_type);
+            st.to_protobuf(result->mutable_status());
+            return;
+        }
+        std::vector<std::string> col_names;
+        std::vector<TypeDescriptor> col_types;
+        st = reader->get_parsed_schema(&col_names, &col_types);
         if (!st.ok()) {
             LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
             st.to_protobuf(result->mutable_status());
             return;
         }
-    }
-    if (file_scan_range.__isset.ranges == false) {
-        st = Status::InternalError("can not get TFileRangeDesc.");
-        st.to_protobuf(result->mutable_status());
-        return;
-    }
-    if (file_scan_range.__isset.params == false) {
-        st = Status::InternalError("can not get TFileScanRangeParams.");
-        st.to_protobuf(result->mutable_status());
-        return;
-    }
-    const TFileRangeDesc& range = file_scan_range.ranges.at(0);
-    const TFileScanRangeParams& params = file_scan_range.params;
-
-    std::unique_ptr<vectorized::GenericReader> reader(nullptr);
-    std::unique_ptr<RuntimeProfile> profile(new 
RuntimeProfile("FetchTableSchema"));
-    switch (params.format_type) {
-    case TFileFormatType::FORMAT_CSV_PLAIN:
-    case TFileFormatType::FORMAT_CSV_GZ:
-    case TFileFormatType::FORMAT_CSV_BZ2:
-    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
-    case TFileFormatType::FORMAT_CSV_LZOP:
-    case TFileFormatType::FORMAT_CSV_DEFLATE: {
-        // file_slots is no use
-        std::vector<SlotDescriptor*> file_slots;
-        reader.reset(new vectorized::CsvReader(profile.get(), params, range, 
file_slots));
-        break;
-    }
-    case TFileFormatType::FORMAT_PARQUET: {
-        reader.reset(new vectorized::ParquetReader(params, range));
-        break;
-    }
-    case TFileFormatType::FORMAT_ORC: {
-        std::vector<std::string> column_names;
-        reader.reset(new vectorized::OrcReader(params, range, column_names, 
""));
-        break;
-    }
-    case TFileFormatType::FORMAT_JSON: {
-        std::vector<SlotDescriptor*> file_slots;
-        reader.reset(new vectorized::NewJsonReader(profile.get(), params, 
range, file_slots));
-        break;
-    }
-    default:
-        st = Status::InternalError("Not supported file format in fetch table 
schema: {}",
-                                   params.format_type);
-        st.to_protobuf(result->mutable_status());
-        return;
-    }
-    std::vector<std::string> col_names;
-    std::vector<TypeDescriptor> col_types;
-    st = reader->get_parsed_schema(&col_names, &col_types);
-    if (!st.ok()) {
-        LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
+        result->set_column_nums(col_names.size());
+        for (size_t idx = 0; idx < col_names.size(); ++idx) {
+            result->add_column_names(col_names[idx]);
+        }
+        for (size_t idx = 0; idx < col_types.size(); ++idx) {
+            PTypeDesc* type_desc = result->add_column_types();
+            col_types[idx].to_protobuf(type_desc);
+        }
         st.to_protobuf(result->mutable_status());
-        return;
-    }
-    result->set_column_nums(col_names.size());
-    for (size_t idx = 0; idx < col_names.size(); ++idx) {
-        result->add_column_names(col_names[idx]);
-    }
-    for (size_t idx = 0; idx < col_types.size(); ++idx) {
-        PTypeDesc* type_desc = result->add_column_types();
-        col_types[idx].to_protobuf(type_desc);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        result->mutable_status()->add_error_msgs("fail to offer request to the 
work pool");
     }
-    st.to_protobuf(result->mutable_status());
 }
 
 void PInternalServiceImpl::get_info(google::protobuf::RpcController* 
controller,
                                     const PProxyRequest* request, 
PProxyResult* response,
                                     google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    // PProxyRequest is defined in gensrc/proto/internal_service.proto
-    // Currently it supports 2 kinds of requests:
-    // 1. get all kafka partition ids for given topic
-    // 2. get all kafka partition offsets for given topic and timestamp.
-    if (request->has_kafka_meta_request()) {
-        const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
-        if (!kafka_request.partition_id_for_latest_offsets().empty()) {
-            // get latest offsets for specified partition ids
-            std::vector<PIntegerPair> partition_offsets;
-            Status st = _exec_env->routine_load_task_executor()
-                                ->get_kafka_latest_offsets_for_partitions(
-                                        request->kafka_meta_request(), 
&partition_offsets);
-            if (st.ok()) {
-                PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
-                for (const auto& entry : partition_offsets) {
-                    PIntegerPair* res = part_offsets->add_offset_times();
-                    res->set_key(entry.key());
-                    res->set_val(entry.val());
+    bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        // PProxyRequest is defined in gensrc/proto/internal_service.proto
+        // Currently it supports 2 kinds of requests:
+        // 1. get all kafka partition ids for given topic
+        // 2. get all kafka partition offsets for given topic and timestamp.
+        if (request->has_kafka_meta_request()) {
+            const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
+            if (!kafka_request.partition_id_for_latest_offsets().empty()) {
+                // get latest offsets for specified partition ids
+                std::vector<PIntegerPair> partition_offsets;
+                Status st = _exec_env->routine_load_task_executor()
+                                    ->get_kafka_latest_offsets_for_partitions(
+                                            request->kafka_meta_request(), 
&partition_offsets);
+                if (st.ok()) {
+                    PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
+                    for (const auto& entry : partition_offsets) {
+                        PIntegerPair* res = part_offsets->add_offset_times();
+                        res->set_key(entry.key());
+                        res->set_val(entry.val());
+                    }
                 }
-            }
-            st.to_protobuf(response->mutable_status());
-            return;
-        } else if (!kafka_request.offset_times().empty()) {
-            // if offset_times() has elements, which means this request is to 
get offset by timestamp.
-            std::vector<PIntegerPair> partition_offsets;
-            Status st =
-                    
_exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
-                            request->kafka_meta_request(), &partition_offsets);
-            if (st.ok()) {
-                PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
-                for (const auto& entry : partition_offsets) {
-                    PIntegerPair* res = part_offsets->add_offset_times();
-                    res->set_key(entry.key());
-                    res->set_val(entry.val());
+                st.to_protobuf(response->mutable_status());
+                return;
+            } else if (!kafka_request.offset_times().empty()) {
+                // if offset_times() has elements, which means this request is 
to get offset by timestamp.
+                std::vector<PIntegerPair> partition_offsets;
+                Status st = _exec_env->routine_load_task_executor()
+                                    ->get_kafka_partition_offsets_for_times(
+                                            request->kafka_meta_request(), 
&partition_offsets);
+                if (st.ok()) {
+                    PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
+                    for (const auto& entry : partition_offsets) {
+                        PIntegerPair* res = part_offsets->add_offset_times();
+                        res->set_key(entry.key());
+                        res->set_val(entry.val());
+                    }
                 }
-            }
-            st.to_protobuf(response->mutable_status());
-            return;
-        } else {
-            // get partition ids of topic
-            std::vector<int32_t> partition_ids;
-            Status st = 
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
-                    request->kafka_meta_request(), &partition_ids);
-            if (st.ok()) {
-                PKafkaMetaProxyResult* kafka_result = 
response->mutable_kafka_meta_result();
-                for (int32_t id : partition_ids) {
-                    kafka_result->add_partition_ids(id);
+                st.to_protobuf(response->mutable_status());
+                return;
+            } else {
+                // get partition ids of topic
+                std::vector<int32_t> partition_ids;
+                Status st = 
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
+                        request->kafka_meta_request(), &partition_ids);
+                if (st.ok()) {
+                    PKafkaMetaProxyResult* kafka_result = 
response->mutable_kafka_meta_result();
+                    for (int32_t id : partition_ids) {
+                        kafka_result->add_partition_ids(id);
+                    }
                 }
+                st.to_protobuf(response->mutable_status());
+                return;
             }
-            st.to_protobuf(response->mutable_status());
-            return;
         }
+        Status::OK().to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
-    Status::OK().to_protobuf(response->mutable_status());
 }
 
 void PInternalServiceImpl::update_cache(google::protobuf::RpcController* 
controller,
                                         const PUpdateCacheRequest* request,
                                         PCacheResponse* response, 
google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    _exec_env->result_cache()->update(request, response);
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        _exec_env->result_cache()->update(request, response);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->set_status(PCacheStatus::CANCELED);
+    }
 }
 
 void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* 
controller,
                                        const PFetchCacheRequest* request, 
PFetchCacheResult* result,
                                        google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    _exec_env->result_cache()->fetch(request, result);
+    bool ret = _heavy_work_pool.try_offer([this, request, result, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        _exec_env->result_cache()->fetch(request, result);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        result->set_status(PCacheStatus::CANCELED);
+    }
 }
 
 void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* 
controller,
                                        const PClearCacheRequest* request, 
PCacheResponse* response,
                                        google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    _exec_env->result_cache()->clear(request, response);
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        _exec_env->result_cache()->clear(request, response);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->set_status(PCacheStatus::CANCELED);
+    }
 }
 
 void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* 
controller,
                                         const ::doris::PMergeFilterRequest* 
request,
                                         ::doris::PMergeFilterResponse* 
response,
                                         ::google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
-    butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
-    Status st = _exec_env->fragment_mgr()->merge_filter(request, 
&zero_copy_input_stream);
-    if (!st.ok()) {
-        LOG(WARNING) << "merge meet error" << st.to_string();
+    bool ret = _light_work_pool.try_offer([this, controller, request, 
response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
+        butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+        Status st = _exec_env->fragment_mgr()->merge_filter(request, 
&zero_copy_input_stream);
+        if (!st.ok()) {
+            LOG(WARNING) << "merge meet error" << st.to_string();
+        }
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
-    st.to_protobuf(response->mutable_status());
 }
 
 void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* 
controller,
                                         const ::doris::PPublishFilterRequest* 
request,
                                         ::doris::PPublishFilterResponse* 
response,
                                         ::google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
-    butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
-    UniqueId unique_id(request->query_id());
-    VLOG_NOTICE << "rpc apply_filter recv";
-    Status st = _exec_env->fragment_mgr()->apply_filter(request, 
&zero_copy_input_stream);
-    if (!st.ok()) {
-        LOG(WARNING) << "apply filter meet error: " << st.to_string();
+    bool ret = _light_work_pool.try_offer([this, controller, request, 
response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
+        butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+        UniqueId unique_id(request->query_id());
+        VLOG_NOTICE << "rpc apply_filter recv";
+        Status st = _exec_env->fragment_mgr()->apply_filter(request, 
&zero_copy_input_stream);
+        if (!st.ok()) {
+            LOG(WARNING) << "apply filter meet error: " << st.to_string();
+        }
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
-    st.to_protobuf(response->mutable_status());
 }
 
 void PInternalServiceImpl::send_data(google::protobuf::RpcController* 
controller,
                                      const PSendDataRequest* request, 
PSendDataResult* response,
                                      google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    TUniqueId fragment_instance_id;
-    fragment_instance_id.hi = request->fragment_instance_id().hi();
-    fragment_instance_id.lo = request->fragment_instance_id().lo();
-    auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
-    if (pipe == nullptr) {
-        response->mutable_status()->set_status_code(1);
-        response->mutable_status()->add_error_msgs("pipe is null");
-    } else {
-        for (int i = 0; i < request->data_size(); ++i) {
-            std::unique_ptr<PDataRow> row(new PDataRow());
-            row->CopyFrom(request->data(i));
-            Status s = pipe->append(std::move(row));
-            if (!s.ok()) {
-                response->mutable_status()->set_status_code(1);
-                response->mutable_status()->add_error_msgs(s.to_string());
-                return;
+    bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        TUniqueId fragment_instance_id;
+        fragment_instance_id.hi = request->fragment_instance_id().hi();
+        fragment_instance_id.lo = request->fragment_instance_id().lo();
+
+        auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+        if (pipe == nullptr) {
+            response->mutable_status()->set_status_code(1);
+            response->mutable_status()->add_error_msgs("pipe is null");
+        } else {
+            for (int i = 0; i < request->data_size(); ++i) {
+                std::unique_ptr<PDataRow> row(new PDataRow());
+                row->CopyFrom(request->data(i));
+                Status s = pipe->append(std::move(row));
+                if (!s.ok()) {
+                    response->mutable_status()->set_status_code(1);
+                    response->mutable_status()->add_error_msgs(s.to_string());
+                    return;
+                }
             }
+            response->mutable_status()->set_status_code(0);
         }
-        response->mutable_status()->set_status_code(0);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
 }
 
 void PInternalServiceImpl::commit(google::protobuf::RpcController* controller,
                                   const PCommitRequest* request, 
PCommitResult* response,
                                   google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    TUniqueId fragment_instance_id;
-    fragment_instance_id.hi = request->fragment_instance_id().hi();
-    fragment_instance_id.lo = request->fragment_instance_id().lo();
-    auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
-    if (pipe == nullptr) {
-        response->mutable_status()->set_status_code(1);
-        response->mutable_status()->add_error_msgs("pipe is null");
-    } else {
-        pipe->finish();
-        response->mutable_status()->set_status_code(0);
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        TUniqueId fragment_instance_id;
+        fragment_instance_id.hi = request->fragment_instance_id().hi();
+        fragment_instance_id.lo = request->fragment_instance_id().lo();
+
+        auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+        if (pipe == nullptr) {
+            response->mutable_status()->set_status_code(1);
+            response->mutable_status()->add_error_msgs("pipe is null");
+        } else {
+            pipe->finish();
+            response->mutable_status()->set_status_code(0);
+        }
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
 }
 
 void PInternalServiceImpl::rollback(google::protobuf::RpcController* 
controller,
                                     const PRollbackRequest* request, 
PRollbackResult* response,
                                     google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    TUniqueId fragment_instance_id;
-    fragment_instance_id.hi = request->fragment_instance_id().hi();
-    fragment_instance_id.lo = request->fragment_instance_id().lo();
-    auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
-    if (pipe == nullptr) {
-        response->mutable_status()->set_status_code(1);
-        response->mutable_status()->add_error_msgs("pipe is null");
-    } else {
-        pipe->cancel("rollback");
-        response->mutable_status()->set_status_code(0);
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        TUniqueId fragment_instance_id;
+        fragment_instance_id.hi = request->fragment_instance_id().hi();
+        fragment_instance_id.lo = request->fragment_instance_id().lo();
+
+        auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+        if (pipe == nullptr) {
+            response->mutable_status()->set_status_code(1);
+            response->mutable_status()->add_error_msgs("pipe is null");
+        } else {
+            pipe->cancel("rollback");
+            response->mutable_status()->set_status_code(0);
+        }
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
 }
 
-void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* 
controller,
                                               const PConstantExprRequest* 
request,
                                               PConstantExprResult* response,
                                               google::protobuf::Closure* done) 
{
-    brpc::ClosureGuard closure_guard(done);
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-
-    Status st = Status::OK();
-    if (request->has_request()) {
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        Status st = Status::OK();
         st = _fold_constant_expr(request->request(), response);
-    } else {
-        // TODO(yangzhengguo) this is just for compatible with old version, 
this should be removed in the release 0.15
-        st = _fold_constant_expr(cntl->request_attachment().to_string(), 
response);
-    }
-    if (!st.ok()) {
-        LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st;
+        if (!st.ok()) {
+            LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st;
+        }
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
-    st.to_protobuf(response->mutable_status());
 }
 
 Status PInternalServiceImpl::_fold_constant_expr(const std::string& 
ser_request,
@@ -700,31 +884,48 @@ Status PInternalServiceImpl::_fold_constant_expr(const 
std::string& ser_request,
     return FoldConstantExecutor().fold_constant_vexpr(t_request, response);
 }
 
-void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* 
controller,
                                           const PTransmitDataParams* request,
                                           PTransmitDataResult* response,
                                           google::protobuf::Closure* done) {
-    // TODO(zxy) delete in 1.2 version
-    google::protobuf::Closure* new_done = new 
NewHttpClosure<PTransmitDataParams>(done);
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
+    bool ret = _heavy_work_pool.try_offer([this, controller, request, 
response, done]() {
+        // TODO(zxy) delete in 1.2 version
+        google::protobuf::Closure* new_done = new 
NewHttpClosure<PTransmitDataParams>(done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+        attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
 
-    _transmit_block(cntl_base, request, response, new_done, Status::OK());
+        _transmit_block(controller, request, response, new_done, Status::OK());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
 }
 
-void 
PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* 
cntl_base,
+void 
PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* 
controller,
                                                   const PEmptyRequest* request,
                                                   PTransmitDataResult* 
response,
                                                   google::protobuf::Closure* 
done) {
-    PTransmitDataParams* new_request = new PTransmitDataParams();
-    google::protobuf::Closure* new_done =
-            new NewHttpClosure<PTransmitDataParams>(new_request, done);
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    Status st = 
attachment_extract_request_contain_block<PTransmitDataParams>(new_request, 
cntl);
-    _transmit_block(cntl_base, new_request, response, new_done, st);
+    bool ret = _heavy_work_pool.try_offer([this, controller, response, done]() 
{
+        PTransmitDataParams* new_request = new PTransmitDataParams();
+        google::protobuf::Closure* new_done =
+                new NewHttpClosure<PTransmitDataParams>(new_request, done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+        Status st =
+                
attachment_extract_request_contain_block<PTransmitDataParams>(new_request, 
cntl);
+        _transmit_block(controller, new_request, response, new_done, st);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
 }
 
-void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* 
controller,
                                            const PTransmitDataParams* request,
                                            PTransmitDataResult* response,
                                            google::protobuf::Closure* done,
@@ -762,25 +963,34 @@ void 
PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co
                                              const PCheckRPCChannelRequest* 
request,
                                              PCheckRPCChannelResponse* 
response,
                                              google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    response->mutable_status()->set_status_code(0);
-    if (request->data().size() != request->size()) {
-        std::stringstream ss;
-        ss << "data size not same, expected: " << request->size()
-           << ", actual: " << request->data().size();
-        response->mutable_status()->add_error_msgs(ss.str());
-        response->mutable_status()->set_status_code(1);
-
-    } else {
-        Md5Digest digest;
-        digest.update(static_cast<const void*>(request->data().c_str()), 
request->data().size());
-        digest.digest();
-        if (!iequal(digest.hex(), request->md5())) {
+    bool ret = _light_work_pool.try_offer([request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(0);
+        if (request->data().size() != request->size()) {
             std::stringstream ss;
-            ss << "md5 not same, expected: " << request->md5() << ", actual: " 
<< digest.hex();
+            ss << "data size not same, expected: " << request->size()
+               << ", actual: " << request->data().size();
             response->mutable_status()->add_error_msgs(ss.str());
             response->mutable_status()->set_status_code(1);
+
+        } else {
+            Md5Digest digest;
+            digest.update(static_cast<const void*>(request->data().c_str()),
+                          request->data().size());
+            digest.digest();
+            if (!iequal(digest.hex(), request->md5())) {
+                std::stringstream ss;
+                ss << "md5 not same, expected: " << request->md5() << ", 
actual: " << digest.hex();
+                response->mutable_status()->add_error_msgs(ss.str());
+                response->mutable_status()->set_status_code(1);
+            }
         }
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
 }
 
@@ -788,44 +998,60 @@ void 
PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co
                                              const PResetRPCChannelRequest* 
request,
                                              PResetRPCChannelResponse* 
response,
                                              google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    response->mutable_status()->set_status_code(0);
-    if (request->all()) {
-        int size = 
ExecEnv::GetInstance()->brpc_internal_client_cache()->size();
-        if (size > 0) {
-            std::vector<std::string> endpoints;
-            
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints);
-            ExecEnv::GetInstance()->brpc_internal_client_cache()->clear();
-            *response->mutable_channels() = {endpoints.begin(), 
endpoints.end()};
-        }
-    } else {
-        for (const std::string& endpoint : request->endpoints()) {
-            if 
(!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) {
-                response->mutable_status()->add_error_msgs(endpoint + ": not 
found.");
-                continue;
+    bool ret = _light_work_pool.try_offer([request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(0);
+        if (request->all()) {
+            int size = 
ExecEnv::GetInstance()->brpc_internal_client_cache()->size();
+            if (size > 0) {
+                std::vector<std::string> endpoints;
+                
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints);
+                ExecEnv::GetInstance()->brpc_internal_client_cache()->clear();
+                *response->mutable_channels() = {endpoints.begin(), 
endpoints.end()};
             }
+        } else {
+            for (const std::string& endpoint : request->endpoints()) {
+                if 
(!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) {
+                    response->mutable_status()->add_error_msgs(endpoint + ": 
not found.");
+                    continue;
+                }
 
-            if 
(ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) {
-                response->add_channels(endpoint);
-            } else {
-                response->mutable_status()->add_error_msgs(endpoint + ": reset 
failed.");
+                if 
(ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) {
+                    response->add_channels(endpoint);
+                } else {
+                    response->mutable_status()->add_error_msgs(endpoint + ": 
reset failed.");
+                }
+            }
+            if (request->endpoints_size() != response->channels_size()) {
+                response->mutable_status()->set_status_code(1);
             }
         }
-        if (request->endpoints_size() != response->channels_size()) {
-            response->mutable_status()->set_status_code(1);
-        }
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
 }
 
-void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* 
controller,
                                       const PHandShakeRequest* request,
                                       PHandShakeResponse* response,
                                       google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    if (request->has_hello()) {
-        response->set_hello(request->hello());
+    bool ret = _light_work_pool.try_offer([request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        if (request->has_hello()) {
+            response->set_hello(request->hello());
+        }
+        response->mutable_status()->set_status_code(0);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
-    response->mutable_status()->set_status_code(0);
 }
 
 void PInternalServiceImpl::request_slave_tablet_pull_rowset(
@@ -840,7 +1066,8 @@ void 
PInternalServiceImpl::request_slave_tablet_pull_rowset(
     int64_t brpc_port = request->brpc_port();
     std::string token = request->token();
     int64_t node_id = request->node_id();
-    _slave_replica_worker_pool.offer([=]() {
+    bool ret = _heavy_work_pool.try_offer([rowset_meta_pb, host, brpc_port, 
node_id, segments_size,
+                                           http_port, token, rowset_path, 
this]() {
         TabletSharedPtr tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
                 rowset_meta_pb.tablet_id(), 
rowset_meta_pb.tablet_schema_hash());
         if (tablet == nullptr) {
@@ -981,6 +1208,12 @@ void 
PInternalServiceImpl::request_slave_tablet_pull_rowset(
         _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
                                     rowset_meta->tablet_id(), node_id, true);
     });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
     Status::OK().to_protobuf(response->mutable_status());
 }
 
@@ -1039,14 +1272,22 @@ void 
PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote
 void PInternalServiceImpl::response_slave_tablet_pull_rowset(
         google::protobuf::RpcController* controller, const 
PTabletWriteSlaveDoneRequest* request,
         PTabletWriteSlaveDoneResult* response, google::protobuf::Closure* 
done) {
-    brpc::ClosureGuard closure_guard(done);
-    VLOG_CRITICAL
-            << "receive the result of slave replica pull rowset from slave 
replica. slave server="
-            << request->node_id() << ", is_succeed=" << request->is_succeed()
-            << ", tablet_id=" << request->tablet_id() << ", txn_id=" << 
request->txn_id();
-    StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset(
-            request->txn_id(), request->tablet_id(), request->node_id(), 
request->is_succeed());
-    Status::OK().to_protobuf(response->mutable_status());
+    bool ret = _heavy_work_pool.try_offer([request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        VLOG_CRITICAL << "receive the result of slave replica pull rowset from 
slave replica. "
+                         "slave server="
+                      << request->node_id() << ", is_succeed=" << 
request->is_succeed()
+                      << ", tablet_id=" << request->tablet_id() << ", txn_id=" 
<< request->txn_id();
+        
StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset(
+                request->txn_id(), request->tablet_id(), request->node_id(), 
request->is_succeed());
+        Status::OK().to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
 }
 
 } // namespace doris
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index 3ea3655974..e5855d98f3 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -194,8 +194,13 @@ private:
 
 private:
     ExecEnv* _exec_env;
-    PriorityThreadPool _tablet_worker_pool;
-    PriorityThreadPool _slave_replica_worker_pool;
+
+    // every brpc service request should put into thread pool
+    // the reason see issue #16634
+    // define the interface for reading and writing data as heavy interface
+    // otherwise as light interface
+    PriorityThreadPool _heavy_work_pool;
+    PriorityThreadPool _light_work_pool;
 };
 
 } // namespace doris
diff --git a/be/src/util/blocking_priority_queue.hpp 
b/be/src/util/blocking_priority_queue.hpp
index 29060613c8..197b709718 100644
--- a/be/src/util/blocking_priority_queue.hpp
+++ b/be/src/util/blocking_priority_queue.hpp
@@ -137,6 +137,18 @@ public:
         return true;
     }
 
+    // Return false if queue full or has been shutdown.
+    bool try_put(const T& val) {
+        std::unique_lock<std::mutex> unique_lock(_lock);
+        if (_queue.size() < _max_element && !_shutdown) {
+            _queue.push(val);
+            unique_lock.unlock();
+            _get_cv.notify_one();
+            return true;
+        }
+        return false;
+    }
+
     // Shut down the queue. Wakes up all threads waiting on blocking_get or 
blocking_put.
     void shutdown() {
         {
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 10bcd28393..786a5625d9 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -213,6 +213,16 @@ public:
     IntCounter* upload_rowset_count;
     IntCounter* upload_fail_count;
 
+    UIntGauge* light_work_pool_queue_size;
+    UIntGauge* heavy_work_pool_queue_size;
+    UIntGauge* heavy_work_active_threads;
+    UIntGauge* light_work_active_threads;
+
+    UIntGauge* heavy_work_pool_max_queue_size;
+    UIntGauge* light_work_pool_max_queue_size;
+    UIntGauge* heavy_work_max_threads;
+    UIntGauge* light_work_max_threads;
+
     static DorisMetrics* instance() {
         static DorisMetrics instance;
         return &instance;
diff --git a/be/src/util/hdfs_storage_backend.cpp 
b/be/src/util/hdfs_storage_backend.cpp
index 6f3baf2d96..083f7e3c21 100644
--- a/be/src/util/hdfs_storage_backend.cpp
+++ b/be/src/util/hdfs_storage_backend.cpp
@@ -17,6 +17,7 @@
 
 #include "util/hdfs_storage_backend.h"
 
+#include "io/fs/err_utils.h"
 #include "io/hdfs_file_reader.h"
 #include "io/hdfs_reader_writer.h"
 #include "io/hdfs_writer.h"
@@ -125,7 +126,7 @@ Status HDFSStorageBackend::list(const std::string& 
remote_path, bool contain_md5
     std::string normal_str = parse_path(remote_path);
     int exists = hdfsExists(_hdfs_fs, normal_str.c_str());
     if (exists != 0) {
-        LOG(INFO) << "path does not exist: " << normal_str << ", err: " << 
strerror(errno);
+        LOG(INFO) << "path does not exist: " << normal_str << ", err: " << 
io::hdfs_error();
         return Status::OK();
     }
 
@@ -134,7 +135,7 @@ Status HDFSStorageBackend::list(const std::string& 
remote_path, bool contain_md5
     if (files_info == nullptr) {
         std::stringstream ss;
         ss << "failed to list files from remote path: " << normal_str
-           << ", err: " << strerror(errno);
+           << ", err: " << io::hdfs_error();
         LOG(WARNING) << ss.str();
         return Status::InternalError(ss.str());
     }
diff --git a/be/src/util/hdfs_storage_backend.h 
b/be/src/util/hdfs_storage_backend.h
index acbf18d2d0..9f1b27f6ac 100644
--- a/be/src/util/hdfs_storage_backend.h
+++ b/be/src/util/hdfs_storage_backend.h
@@ -17,8 +17,7 @@
 
 #pragma once
 
-#include <hdfs/hdfs.h>
-
+#include "io/fs/hdfs.h"
 #include "io/hdfs_builder.h"
 #include "util/storage_backend.h"
 
diff --git a/be/src/util/hdfs_util.cpp b/be/src/util/hdfs_util.cpp
index b58fc75e46..e72b08caef 100644
--- a/be/src/util/hdfs_util.cpp
+++ b/be/src/util/hdfs_util.cpp
@@ -21,6 +21,7 @@
 
 #include "common/config.h"
 #include "common/logging.h"
+#include "io/fs/err_utils.h"
 
 namespace doris {
 
@@ -33,7 +34,7 @@ hdfsFS HDFSHandle::create_hdfs_fs(HDFSCommonBuilder& 
hdfs_builder) {
     hdfsFS hdfs_fs = hdfsBuilderConnect(hdfs_builder.get());
     if (hdfs_fs == nullptr) {
         LOG(WARNING) << "connect to hdfs failed."
-                     << ", error: " << hdfsGetLastError();
+                     << ", error: " << io::hdfs_error();
         return nullptr;
     }
     return hdfs_fs;
diff --git a/be/src/util/hdfs_util.h b/be/src/util/hdfs_util.h
index f7bfc14b3a..a872cfe89d 100644
--- a/be/src/util/hdfs_util.h
+++ b/be/src/util/hdfs_util.h
@@ -17,13 +17,12 @@
 
 #pragma once
 
-#include <hdfs/hdfs.h>
-
 #include <map>
 #include <memory>
 #include <string>
 
 #include "common/status.h"
+#include "io/fs/hdfs.h"
 #include "io/hdfs_builder.h"
 
 namespace doris {
@@ -40,4 +39,4 @@ private:
     HDFSHandle() {}
 };
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp
index 64a8e09bec..01558cb2a9 100644
--- a/be/src/util/jni-util.cpp
+++ b/be/src/util/jni-util.cpp
@@ -24,6 +24,8 @@
 #include <cstdlib>
 #include <filesystem>
 #include <sstream>
+#include <string>
+#include <vector>
 
 #include "common/config.h"
 #include "gutil/once.h"
@@ -37,10 +39,10 @@ namespace doris {
 
 namespace {
 JavaVM* g_vm;
-GoogleOnceType g_vm_once = GOOGLE_ONCE_INIT;
+[[maybe_unused]] std::once_flag g_vm_once;
 
 const std::string GetDorisJNIClasspath() {
-    const auto* classpath = getenv("DORIS_JNI_CLASSPATH_PARAMETER");
+    const auto* classpath = getenv("DORIS_CLASSPATH");
     if (classpath) {
         return classpath;
     } else {
@@ -66,84 +68,50 @@ const std::string GetDorisJNIClasspath() {
     }
 }
 
-void FindOrCreateJavaVM() {
+// Only used on non-x86 platform
+[[maybe_unused]] void FindOrCreateJavaVM() {
     int num_vms;
-    int rv = LibJVMLoader::JNI_GetCreatedJavaVMs(&g_vm, 1, &num_vms);
+    int rv = JNI_GetCreatedJavaVMs(&g_vm, 1, &num_vms);
     if (rv == 0) {
-        JavaVMOption* options;
-        auto classpath = GetDorisJNIClasspath();
-        // The following 4 opts are default opts,
-        // they can be override by JAVA_OPTS env var.
-        std::string heap_size = fmt::format("-Xmx{}", 
config::jvm_max_heap_size);
-        std::string log_path = fmt::format("-DlogPath={}/log/udf-jdbc.log", 
getenv("DORIS_HOME"));
-        std::string critical_jni = "-XX:-CriticalJNINatives";
-        std::string max_fd_limit = "-XX:-MaxFDLimit";
+        std::vector<std::string> options;
 
         char* java_opts = getenv("JAVA_OPTS");
-        int no_args;
         if (java_opts == nullptr) {
-            no_args = 4; // classpath, heapsize, log path, critical
+            options = {
+                    GetDorisJNIClasspath(), fmt::format("-Xmx{}", "1g"),
+                    fmt::format("-DlogPath={}/log/jni.log", 
getenv("DORIS_HOME")),
+                    fmt::format("-Dsun.java.command={}", "DorisBE"), 
"-XX:-CriticalJNINatives",
 #ifdef __APPLE__
-            no_args++; // -XX:-MaxFDLimit
-#endif
-            options = (JavaVMOption*)calloc(no_args, sizeof(JavaVMOption));
-            options[0].optionString = const_cast<char*>(classpath.c_str());
-            options[1].optionString = const_cast<char*>(heap_size.c_str());
-            options[2].optionString = const_cast<char*>(log_path.c_str());
-            options[3].optionString = const_cast<char*>(critical_jni.c_str());
-#ifdef __APPLE__
-            // On macOS, we should disable MaxFDLimit, otherwise the 
RLIMIT_NOFILE
-            // will be assigned the minimum of OPEN_MAX (10240) and rlim_cur 
(See src/hotspot/os/bsd/os_bsd.cpp)
-            // and it can not pass the check performed by storage engine.
-            // The newer JDK has fixed this issue.
-            options[4].optionString = const_cast<char*>(max_fd_limit.c_str());
+                    // On macOS, we should disable MaxFDLimit, otherwise the 
RLIMIT_NOFILE
+                    // will be assigned the minimum of OPEN_MAX (10240) and 
rlim_cur (See src/hotspot/os/bsd/os_bsd.cpp)
+                    // and it can not pass the check performed by storage 
engine.
+                    // The newer JDK has fixed this issue.
+                    "-XX:-MaxFDLimit"
 #endif
+            };
         } else {
-            // user specified opts
-            // 1. find the number of args
-            java_opts = strdup(java_opts);
-            char *str, *token, *save_ptr;
-            char jvm_arg_delims[] = " ";
-            for (no_args = 1, str = java_opts;; no_args++, str = nullptr) {
-                token = strtok_r(str, jvm_arg_delims, &save_ptr);
-                if (token == nullptr) {
-                    break;
-                }
-            }
-            free(java_opts);
-            // 2. set args
-            options = (JavaVMOption*)calloc(no_args, sizeof(JavaVMOption));
-            options[0].optionString = const_cast<char*>(classpath.c_str());
-            java_opts = getenv("JAVA_OPTS");
-            if (java_opts != NULL) {
-                java_opts = strdup(java_opts);
-                for (no_args = 1, str = java_opts;; no_args++, str = nullptr) {
-                    token = strtok_r(str, jvm_arg_delims, &save_ptr);
-                    if (token == nullptr) {
-                        break;
-                    }
-                    options[no_args].optionString = token;
-                }
-            }
+            std::istringstream stream(java_opts);
+            options = 
std::vector<std::string>(std::istream_iterator<std::string> {stream},
+                                               
std::istream_iterator<std::string>());
+            options.push_back(GetDorisJNIClasspath());
+        }
+        std::unique_ptr<JavaVMOption[]> jvm_options(new 
JavaVMOption[options.size()]);
+        for (int i = 0; i < options.size(); ++i) {
+            jvm_options[i] = {const_cast<char*>(options[i].c_str()), nullptr};
         }
 
         JNIEnv* env;
         JavaVMInitArgs vm_args;
         vm_args.version = JNI_VERSION_1_8;
-        vm_args.options = options;
-        vm_args.nOptions = no_args;
+        vm_args.options = jvm_options.get();
+        vm_args.nOptions = options.size();
         // Set it to JNI_FALSE because JNI_TRUE will let JVM ignore the max 
size config.
         vm_args.ignoreUnrecognized = JNI_FALSE;
 
-        jint res = LibJVMLoader::JNI_CreateJavaVM(&g_vm, (void**)&env, 
&vm_args);
+        jint res = JNI_CreateJavaVM(&g_vm, (void**)&env, &vm_args);
         if (JNI_OK != res) {
             DCHECK(false) << "Failed to create JVM, code= " << res;
         }
-
-        if (java_opts != nullptr) {
-            free(java_opts);
-        }
-        free(options);
     } else {
         CHECK_EQ(rv, 0) << "Could not find any created Java VM";
         CHECK_EQ(num_vms, 1) << "No VMs returned";
@@ -196,7 +164,8 @@ Status JniLocalFrame::push(JNIEnv* env, int max_local_ref) {
 Status JniUtil::GetJNIEnvSlowPath(JNIEnv** env) {
     DCHECK(!tls_env_) << "Call GetJNIEnv() fast path";
 
-    GoogleOnceInit(&g_vm_once, &FindOrCreateJavaVM);
+#ifdef USE_LIBHDFS3
+    std::call_once(g_vm_once, FindOrCreateJavaVM);
     int rc = g_vm->GetEnv(reinterpret_cast<void**>(&tls_env_), 
JNI_VERSION_1_8);
     if (rc == JNI_EDETACHED) {
         rc = g_vm->AttachCurrentThread((void**)&tls_env_, nullptr);
@@ -204,6 +173,10 @@ Status JniUtil::GetJNIEnvSlowPath(JNIEnv** env) {
     if (rc != 0 || tls_env_ == nullptr) {
         return Status::InternalError("Unable to get JVM: {}", rc);
     }
+#else
+    // the hadoop libhdfs will do all the stuff
+    tls_env_ = getJNIEnv();
+#endif
     *env = tls_env_;
     return Status::OK();
 }
diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h
index 0e551f17cf..dfef2d3be7 100644
--- a/be/src/util/jni-util.h
+++ b/be/src/util/jni-util.h
@@ -23,6 +23,11 @@
 #include "gutil/macros.h"
 #include "util/thrift_util.h"
 
+#ifdef USE_HADOOP_HDFS
+// defined in hadoop_hdfs/hdfs.h
+extern "C" JNIEnv* getJNIEnv(void);
+#endif
+
 namespace doris {
 
 #define RETURN_ERROR_IF_EXC(env)                                     \
diff --git a/be/src/util/libjvm_loader.cpp b/be/src/util/libjvm_loader.cpp
index 127d28c2de..6175da6081 100644
--- a/be/src/util/libjvm_loader.cpp
+++ b/be/src/util/libjvm_loader.cpp
@@ -25,6 +25,15 @@
 
 #include "common/status.h"
 
+_JNI_IMPORT_OR_EXPORT_ jint JNICALL JNI_GetCreatedJavaVMs(JavaVM** vm_buf, 
jsize bufLen,
+                                                          jsize* numVMs) {
+    return doris::LibJVMLoader::JNI_GetCreatedJavaVMs(vm_buf, bufLen, numVMs);
+}
+
+_JNI_IMPORT_OR_EXPORT_ jint JNICALL JNI_CreateJavaVM(JavaVM** pvm, void** 
penv, void* args) {
+    return doris::LibJVMLoader::JNI_CreateJavaVM(pvm, penv, args);
+}
+
 namespace {
 
 #ifndef __APPLE__
diff --git a/be/src/util/priority_thread_pool.hpp 
b/be/src/util/priority_thread_pool.hpp
index 32fc637970..1c1bf8117e 100644
--- a/be/src/util/priority_thread_pool.hpp
+++ b/be/src/util/priority_thread_pool.hpp
@@ -54,7 +54,7 @@ public:
     //     queue exceeds this size, subsequent calls to Offer will block until 
there is
     //     capacity available.
     PriorityThreadPool(uint32_t num_threads, uint32_t queue_size, const 
std::string& name)
-            : _work_queue(queue_size), _shutdown(false), _name(name) {
+            : _work_queue(queue_size), _shutdown(false), _name(name), 
_active_threads(0) {
         for (int i = 0; i < num_threads; ++i) {
             _threads.create_thread(
                     
std::bind<void>(std::mem_fn(&PriorityThreadPool::work_thread), this, i));
@@ -86,6 +86,11 @@ public:
         return _work_queue.blocking_put(task);
     }
 
+    virtual bool try_offer(WorkFunction func) {
+        PriorityThreadPool::Task task = {0, func, 0};
+        return _work_queue.try_put(task);
+    }
+
     // Shuts the thread pool down, causing the work queue to cease accepting 
offered work
     // and the worker threads to terminate once they have processed their 
current work item.
     // Returns once the shutdown flag has been set, does not wait for the 
threads to
@@ -100,6 +105,7 @@ public:
     virtual void join() { _threads.join_all(); }
 
     virtual uint32_t get_queue_size() const { return _work_queue.get_size(); }
+    virtual uint32_t get_active_threads() const { return _active_threads; }
 
     // Blocks until the work queue is empty, and then calls shutdown to stop 
the worker
     // threads and Join to wait until they are finished.
@@ -135,7 +141,9 @@ private:
         while (!is_shutdown()) {
             Task task;
             if (_work_queue.blocking_get(&task)) {
+                _active_threads++;
                 task.work_function();
+                _active_threads--;
             }
             if (_work_queue.get_size() == 0) {
                 _empty_cv.notify_all();
@@ -150,6 +158,7 @@ private:
     // Set to true when threads should stop doing work and terminate.
     std::atomic<bool> _shutdown;
     std::string _name;
+    std::atomic<int> _active_threads;
 };
 
 } // namespace doris
diff --git a/bin/start_be.sh b/bin/start_be.sh
index 89c40947c0..231986940e 100755
--- a/bin/start_be.sh
+++ b/bin/start_be.sh
@@ -20,7 +20,8 @@ set -eo pipefail
 
 curdir="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"
 
-if [[ "$(uname -s)" == 'Darwin' ]] && command -v brew &>/dev/null; then
+MACHINE_OS=$(uname -s)
+if [[ "${MACHINE_OS}" == 'Darwin' ]] && command -v brew &>/dev/null; then
     PATH="$(brew --prefix)/opt/gnu-getopt/bin:${PATH}"
     export PATH
 fi
@@ -70,16 +71,36 @@ if [[ "$(uname -s)" != 'Darwin' ]]; then
     fi
 fi
 
-# add libs to CLASSPATH
+# add java libs
 for f in "${DORIS_HOME}/lib"/*.jar; do
-    if [[ -z "${DORIS_JNI_CLASSPATH_PARAMETER}" ]]; then
-        export DORIS_JNI_CLASSPATH_PARAMETER="${f}"
+    if [[ -z "${DORIS_CLASSPATH}" ]]; then
+        export DORIS_CLASSPATH="${f}"
     else
-        export 
DORIS_JNI_CLASSPATH_PARAMETER="${f}:${DORIS_JNI_CLASSPATH_PARAMETER}"
+        export DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
     fi
 done
-# DORIS_JNI_CLASSPATH_PARAMETER is used to configure additional jar path to 
jvm. e.g. -Djava.class.path=$DORIS_HOME/lib/java-udf.jar
-export 
DORIS_JNI_CLASSPATH_PARAMETER="-Djava.class.path=${DORIS_JNI_CLASSPATH_PARAMETER}"
+
+if [[ -d "${DORIS_HOME}/lib/hadoop_hdfs/" ]]; then
+    # add hadoop libs
+    for f in "${DORIS_HOME}/lib/hadoop_hdfs/common"/*.jar; do
+        DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
+    done
+    for f in "${DORIS_HOME}/lib/hadoop_hdfs/common/lib"/*.jar; do
+        DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
+    done
+    for f in "${DORIS_HOME}/lib/hadoop_hdfs/hdfs"/*.jar; do
+        DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
+    done
+    for f in "${DORIS_HOME}/lib/hadoop_hdfs/hdfs/lib"/*.jar; do
+        DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
+    done
+fi
+
+# the CLASSPATH and LIBHDFS_OPTS is used for hadoop libhdfs
+# and conf/ dir so that hadoop libhdfs can read .xml config file in conf/
+export CLASSPATH="${DORIS_HOME}/conf/:${DORIS_CLASSPATH}"
+# DORIS_CLASSPATH is for self-managed jni
+export DORIS_CLASSPATH="-Djava.class.path=${DORIS_CLASSPATH}"
 
 jdk_version() {
     local java_cmd="${1}"
@@ -230,11 +251,28 @@ set_tcmalloc_heap_limit() {
 
 # set_tcmalloc_heap_limit || exit 1
 
-## set hdfs conf
+## set hdfs3 conf
 if [[ -f "${DORIS_HOME}/conf/hdfs-site.xml" ]]; then
     export LIBHDFS3_CONF="${DORIS_HOME}/conf/hdfs-site.xml"
 fi
 
+if [[ -z ${JAVA_OPTS} ]]; then
+    # set default JAVA_OPTS
+    CUR_DATE=$(date +%Y%m%d-%H%M%S)
+    JAVA_OPTS="-Xmx1024m -DlogPath=${DORIS_HOME}/log/jni.log 
-Xloggc:${DORIS_HOME}/log/be.gc.log.${CUR_DATE} -Dsun.java.command=DorisBE 
-XX:-CriticalJNINatives"
+fi
+
+if [[ "${MACHINE_OS}" == "Darwin" ]]; then
+    JAVA_OPTS="${JAVA_OPTS} -XX:-MaxFDLimit"
+fi
+
+# set LIBHDFS_OPTS for hadoop libhdfs
+export LIBHDFS_OPTS="${JAVA_OPTS}"
+
+#echo "CLASSPATH: ${CLASSPATH}"
+#echo "LD_LIBRARY_PATH: ${LD_LIBRARY_PATH}"
+#echo "LIBHDFS_OPTS: ${LIBHDFS_OPTS}"
+
 # see 
https://github.com/apache/doris/blob/master/docs/zh-CN/community/developer-guide/debug-tool.md#jemalloc-heap-profile
 export 
JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:30000,dirty_decay_ms:30000,oversize_threshold:0,lg_tcache_max:16,prof_prefix:jeprof.out"
 
diff --git a/build.sh b/build.sh
index 9eaa16d8b6..d8296e9bbf 100755
--- a/build.sh
+++ b/build.sh
@@ -539,6 +539,10 @@ if [[ "${BUILD_BE}" -eq 1 ]]; then
     cp -r -p "${DORIS_HOME}/be/output/bin"/* "${DORIS_OUTPUT}/be/bin"/
     cp -r -p "${DORIS_HOME}/be/output/conf"/* "${DORIS_OUTPUT}/be/conf"/
 
+    if [[ -d "${DORIS_THIRDPARTY}/installed/lib/hadoop_hdfs/" ]]; then
+        cp -r -p "${DORIS_THIRDPARTY}/installed/lib/hadoop_hdfs/" 
"${DORIS_OUTPUT}/be/lib/"
+    fi
+
     if [[ "${BUILD_JAVA_UDF}" -eq 0 ]]; then
         echo -e "\033[33;1mWARNNING: \033[37;1mDisable Java UDF support in 
be.conf due to the BE was built without Java UDF.\033[0m"
         cat >>"${DORIS_OUTPUT}/be/conf/be.conf" <<EOF
diff --git a/conf/be.conf b/conf/be.conf
index 240e7ee920..cc1b8f6c59 100644
--- a/conf/be.conf
+++ b/conf/be.conf
@@ -17,8 +17,8 @@
 
 PPROF_TMPDIR="$DORIS_HOME/log/"
 
-# if JAVA_OPTS is set, it will override the jvm opts for BE jvm.
-#JAVA_OPTS="-Xmx8192m -DlogPath=$DORIS_HOME/log/udf-jdbc.log 
-Djava.compiler=NONE -XX::-CriticalJNINatives"
+DATE = `date +%Y%m%d-%H%M%S`
+JAVA_OPTS="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log 
-Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Dsun.java.command=DorisBE 
-XX:-CriticalJNINatives"
 
 # since 1.2, the JAVA_HOME need to be set to run BE process.
 # JAVA_HOME=/path/to/jdk/
diff --git 
a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md 
b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
index 610125abe3..464c59c309 100644
--- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
+++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
@@ -295,6 +295,12 @@ curl http://be_host:webserver_port/metrics?type=json
 |`fragment_thread_pool_queue_size`| | Num | 当前查询执行线程池等待队列的长度 | 
如果大于零,则说明查询线程已耗尽,查询会出现堆积 | P0 |
 |`doris_be_all_rowsets_num`| | Num | 当前所有 rowset 的个数 | | P0 |
 |`doris_be_all_segments_num`| | Num | 当前所有 segment 的个数 | | P0 |
+|`doris_be_heavy_work_max_threads`| | Num | brpc heavy线程池线程个数| | p0 |
+|`doris_be_light_work_max_threads`| | Num | brpc light线程池线程个数| | p0 | 
+|`doris_be_heavy_work_pool_queue_size`| | Num | brpc 
heavy线程池队列最大长度,超过则阻塞提交work| | p0 |
+|`doris_be_light_work_pool_queue_size`| | Num | brpc 
light线程池队列最大长度,超过则阻塞提交work| | p0 |
+|`doris_be_heavy_work_active_threads`| | Num | brpc heavy线程池活跃线程数| | p0 |
+|`doris_be_light_work_active_threads`| | Num | brpc light线程池活跃线程数| | p0 |
 
 ### 机器监控
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
index 28ecbea7a7..44c7bc967e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
@@ -271,7 +271,7 @@ public class BeLoadRebalancer extends Rebalancer {
                 if (lowBackend == null) {
                     continue;
                 }
-                if (hosts.contains(lowBackend.getHost())) {
+                if (!Config.allow_replica_on_same_host && 
hosts.contains(lowBackend.getHost())) {
                     continue;
                 }
 
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 95e1887b26..197d38dc78 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -243,6 +243,7 @@ enum PCacheStatus {
     INVALID_KEY_RANGE = 6;
     DATA_OVERDUE = 7;
     EMPTY_DATA = 8;
+    CANCELED = 9;
 };
 
 enum CacheType {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to