This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new 890a8bb098 [refactor] intro pthread pool for brpc and hadoop libhdfs
(#18513)
890a8bb098 is described below
commit 890a8bb098a38019d949e3b261929b22bdeee611
Author: Mingyu Chen <[email protected]>
AuthorDate: Fri Apr 21 22:05:29 2023 +0800
[refactor] intro pthread pool for brpc and hadoop libhdfs (#18513)
1. cherry-pick [Improvement](brpc) Using a thread pool for RPC service
avoiding std::mutex block brpc::bthread (#16639)
2. cherry-pick [fix](brpc) solve bthread hang problem (#17206)
3. cherry-pick [deps](libhdfs) add official hadoop libhdfs for x86 (#17435)
---
be/CMakeLists.txt | 25 +-
be/src/common/config.h | 16 +-
be/src/io/CMakeLists.txt | 1 +
be/src/{util/hdfs_util.cpp => io/fs/err_utils.cpp} | 39 +-
be/src/{util/hdfs_util.h => io/fs/err_utils.h} | 26 +-
be/src/{util/hdfs_util.h => io/fs/hdfs.h} | 27 +-
be/src/io/hdfs_builder.cpp | 7 +-
be/src/io/hdfs_builder.h | 8 +-
be/src/io/hdfs_file_reader.cpp | 14 +-
be/src/io/hdfs_writer.cpp | 21 +-
be/src/service/internal_service.cpp | 927 +++++++++++++--------
be/src/service/internal_service.h | 9 +-
be/src/util/blocking_priority_queue.hpp | 12 +
be/src/util/doris_metrics.h | 10 +
be/src/util/hdfs_storage_backend.cpp | 5 +-
be/src/util/hdfs_storage_backend.h | 3 +-
be/src/util/hdfs_util.cpp | 3 +-
be/src/util/hdfs_util.h | 5 +-
be/src/util/jni-util.cpp | 97 +--
be/src/util/jni-util.h | 5 +
be/src/util/libjvm_loader.cpp | 9 +
be/src/util/priority_thread_pool.hpp | 11 +-
bin/start_be.sh | 54 +-
build.sh | 4 +
conf/be.conf | 4 +-
.../maint-monitor/monitor-metrics/metrics.md | 6 +
.../org/apache/doris/clone/BeLoadRebalancer.java | 2 +-
gensrc/proto/internal_service.proto | 1 +
28 files changed, 842 insertions(+), 509 deletions(-)
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 114e7be057..b3ad886910 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -398,9 +398,6 @@ set_target_properties(k5crypto PROPERTIES IMPORTED_LOCATION
${THIRDPARTY_DIR}/li
add_library(gssapi_krb5 STATIC IMPORTED)
set_target_properties(gssapi_krb5 PROPERTIES IMPORTED_LOCATION
${THIRDPARTY_DIR}/lib/libgssapi_krb5.a)
-add_library(hdfs3 STATIC IMPORTED)
-set_target_properties(hdfs3 PROPERTIES IMPORTED_LOCATION
${THIRDPARTY_DIR}/lib/libhdfs3.a)
-
find_program(THRIFT_COMPILER thrift ${CMAKE_SOURCE_DIR}/bin)
if (OS_MACOSX)
@@ -719,12 +716,32 @@ set(COMMON_THIRDPARTY
# put this after lz4 to avoid using lz4 lib in librdkafka
librdkafka_cpp
librdkafka
- hdfs3
xml2
lzma
simdjson
)
+if (ARCH_AMD64 AND OS_LINUX)
+ add_library(hadoop_hdfs STATIC IMPORTED)
+ set_target_properties(hadoop_hdfs PROPERTIES IMPORTED_LOCATION
${THIRDPARTY_DIR}/lib/hadoop_hdfs/native/libhdfs.a)
+
+ set(COMMON_THIRDPARTY
+ ${COMMON_THIRDPARTY}
+ hadoop_hdfs
+ )
+ add_definitions(-DUSE_HADOOP_HDFS)
+else()
+ add_library(hdfs3 STATIC IMPORTED)
+ set_target_properties(hdfs3 PROPERTIES IMPORTED_LOCATION
${THIRDPARTY_DIR}/lib/libhdfs3.a)
+
+ # TODO: use arm hadoop hdfs to replace this
+ set(COMMON_THIRDPARTY
+ ${COMMON_THIRDPARTY}
+ hdfs3
+ )
+ add_definitions(-DUSE_LIBHDFS3)
+endif()
+
if (OS_MACOSX)
set(COMMON_THIRDPARTY
${COMMON_THIRDPARTY}
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 94cad91159..91e77f0a9c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -35,7 +35,8 @@ CONF_Int32(be_port, "9060");
// port for brpc
CONF_Int32(brpc_port, "8060");
-// the number of bthreads for brpc, the default value is set to -1, which
means the number of bthreads is #cpu-cores
+// the number of bthreads for brpc, the default value is set to -1,
+// which means the number of bthreads is #cpu-cores
CONF_Int32(brpc_num_threads, "-1");
// port to brpc server for single replica load
@@ -395,8 +396,15 @@ CONF_Int32(single_replica_load_download_num_workers, "64");
CONF_Int64(load_data_reserve_hours, "4");
// log error log will be removed after this time
CONF_mInt64(load_error_log_reserve_hours, "48");
-CONF_Int32(number_tablet_writer_threads, "16");
-CONF_Int32(number_slave_replica_download_threads, "64");
+
+// be brpc interface is classified into two categories: light and heavy
+// each category has diffrent thread number
+// threads to handle heavy api interface, such as transmit_data/transmit_block
etc
+CONF_Int32(brpc_heavy_work_pool_threads, "192");
+// threads to handle light api interface, such as
exec_plan_fragment_prepare/exec_plan_fragment_start
+CONF_Int32(brpc_light_work_pool_threads, "32");
+CONF_Int32(brpc_heavy_work_pool_max_queue_size, "10240");
+CONF_Int32(brpc_light_work_pool_max_queue_size, "10240");
// The maximum amount of data that can be processed by a stream load
CONF_mInt64(streaming_load_max_mb, "10240");
@@ -898,8 +906,6 @@ CONF_Int32(segcompaction_threshold_segment_num, "10");
// The segment whose row number above the threshold will be compacted during
segcompaction
CONF_Int32(segcompaction_small_threshold, "1048576");
-CONF_String(jvm_max_heap_size, "1024M");
-
// enable java udf and jdbc scannode
CONF_Bool(enable_java_support, "true");
diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt
index 4096d2557b..2a36d4a3c2 100644
--- a/be/src/io/CMakeLists.txt
+++ b/be/src/io/CMakeLists.txt
@@ -34,6 +34,7 @@ set(IO_FILES
local_file_writer.cpp
s3_reader.cpp
s3_writer.cpp
+ fs/err_utils.cpp
fs/file_system_map.cpp
fs/local_file_reader.cpp
fs/local_file_system.cpp
diff --git a/be/src/util/hdfs_util.cpp b/be/src/io/fs/err_utils.cpp
similarity index 53%
copy from be/src/util/hdfs_util.cpp
copy to be/src/io/fs/err_utils.cpp
index b58fc75e46..d08ea1bee4 100644
--- a/be/src/util/hdfs_util.cpp
+++ b/be/src/io/fs/err_utils.cpp
@@ -14,29 +14,40 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+#include "io/fs/err_utils.h"
-#include "util/hdfs_util.h"
+#include <fmt/format.h>
-#include <util/string_util.h>
+#include <sstream>
-#include "common/config.h"
-#include "common/logging.h"
+#include "io/fs/hdfs.h"
namespace doris {
+namespace io {
-HDFSHandle& HDFSHandle::instance() {
- static HDFSHandle hdfs_handle;
- return hdfs_handle;
+std::string errno_to_str() {
+ char buf[1024];
+ return fmt::format("({}), {}", errno, strerror_r(errno, buf, 1024));
}
-hdfsFS HDFSHandle::create_hdfs_fs(HDFSCommonBuilder& hdfs_builder) {
- hdfsFS hdfs_fs = hdfsBuilderConnect(hdfs_builder.get());
- if (hdfs_fs == nullptr) {
- LOG(WARNING) << "connect to hdfs failed."
- << ", error: " << hdfsGetLastError();
- return nullptr;
+std::string errcode_to_str(const std::error_code& ec) {
+ return fmt::format("({}), {}", ec.value(), ec.message());
+}
+
+std::string hdfs_error() {
+ std::stringstream ss;
+ char buf[1024];
+ ss << "(" << errno << "), " << strerror_r(errno, buf, 1024) << ")";
+#ifdef USE_HADOOP_HDFS
+ char* root_cause = hdfsGetLastExceptionRootCause();
+ if (root_cause != nullptr) {
+ ss << ", reason: " << root_cause;
}
- return hdfs_fs;
+#else
+ ss << ", reason: " << hdfsGetLastError();
+#endif
+ return ss.str();
}
+} // namespace io
} // namespace doris
diff --git a/be/src/util/hdfs_util.h b/be/src/io/fs/err_utils.h
similarity index 73%
copy from be/src/util/hdfs_util.h
copy to be/src/io/fs/err_utils.h
index f7bfc14b3a..31ca702c32 100644
--- a/be/src/util/hdfs_util.h
+++ b/be/src/io/fs/err_utils.h
@@ -17,27 +17,15 @@
#pragma once
-#include <hdfs/hdfs.h>
-
-#include <map>
-#include <memory>
#include <string>
-
-#include "common/status.h"
-#include "io/hdfs_builder.h"
+#include <system_error>
namespace doris {
+namespace io {
-class HDFSHandle {
-public:
- ~HDFSHandle() {}
-
- static HDFSHandle& instance();
-
- hdfsFS create_hdfs_fs(HDFSCommonBuilder& builder);
-
-private:
- HDFSHandle() {}
-};
+std::string errno_to_str();
+std::string errcode_to_str(const std::error_code& ec);
+std::string hdfs_error();
-} // namespace doris
\ No newline at end of file
+} // namespace io
+} // namespace doris
diff --git a/be/src/util/hdfs_util.h b/be/src/io/fs/hdfs.h
similarity index 72%
copy from be/src/util/hdfs_util.h
copy to be/src/io/fs/hdfs.h
index f7bfc14b3a..eb9e1b2c07 100644
--- a/be/src/util/hdfs_util.h
+++ b/be/src/io/fs/hdfs.h
@@ -17,27 +17,8 @@
#pragma once
+#ifdef USE_HADOOP_HDFS
+#include <hadoop_hdfs/hdfs.h>
+#else
#include <hdfs/hdfs.h>
-
-#include <map>
-#include <memory>
-#include <string>
-
-#include "common/status.h"
-#include "io/hdfs_builder.h"
-
-namespace doris {
-
-class HDFSHandle {
-public:
- ~HDFSHandle() {}
-
- static HDFSHandle& instance();
-
- hdfsFS create_hdfs_fs(HDFSCommonBuilder& builder);
-
-private:
- HDFSHandle() {}
-};
-
-} // namespace doris
\ No newline at end of file
+#endif
diff --git a/be/src/io/hdfs_builder.cpp b/be/src/io/hdfs_builder.cpp
index c82af8e299..c971173c91 100644
--- a/be/src/io/hdfs_builder.cpp
+++ b/be/src/io/hdfs_builder.cpp
@@ -18,6 +18,7 @@
#include "io/hdfs_builder.h"
#include <fmt/format.h>
+#include <thrift/protocol/TDebugProtocol.h>
#include <fstream>
@@ -26,6 +27,7 @@
#include "util/string_util.h"
#include "util/uid_util.h"
#include "util/url_coding.h"
+
namespace doris {
Status HDFSCommonBuilder::init_hdfs_builder() {
@@ -36,6 +38,7 @@ Status HDFSCommonBuilder::init_hdfs_builder() {
return Status::InternalError(
"failed to init HDFSCommonBuilder, please check
be/conf/hdfs-site.xml and be.out");
}
+ hdfsBuilderSetForceNewInstance(hdfs_builder);
return Status::OK();
}
@@ -54,7 +57,10 @@ Status HDFSCommonBuilder::run_kinit() {
if (!rc) {
return Status::InternalError("Kinit failed, errMsg: " + msg);
}
+#ifdef USE_LIBHDFS3
+ hdfsBuilderSetPrincipal(hdfs_builder, hdfs_kerberos_principal.c_str());
hdfsBuilderSetKerbTicketCachePath(hdfs_builder, ticket_path.c_str());
+#endif
return Status::OK();
}
@@ -97,7 +103,6 @@ Status createHDFSBuilder(const THdfsParams& hdfsParams,
HDFSCommonBuilder* build
if (hdfsParams.__isset.hdfs_kerberos_principal) {
builder->need_kinit = true;
builder->hdfs_kerberos_principal = hdfsParams.hdfs_kerberos_principal;
- hdfsBuilderSetPrincipal(builder->get(),
hdfsParams.hdfs_kerberos_principal.c_str());
}
if (hdfsParams.__isset.hdfs_kerberos_keytab) {
builder->need_kinit = true;
diff --git a/be/src/io/hdfs_builder.h b/be/src/io/hdfs_builder.h
index eb63fab1b5..26164ce6ed 100644
--- a/be/src/io/hdfs_builder.h
+++ b/be/src/io/hdfs_builder.h
@@ -17,10 +17,9 @@
#pragma once
-#include <hdfs/hdfs.h>
-
#include "gen_cpp/PlanNodes_types.h"
#include "io/file_reader.h"
+#include "io/fs/hdfs.h"
namespace doris {
@@ -38,9 +37,12 @@ class HDFSCommonBuilder {
public:
HDFSCommonBuilder() = default;
~HDFSCommonBuilder() {
+#if defined(USE_LIBHDFS3) || defined(BE_TEST)
+ // for hadoop hdfs, the hdfs_builder will be freed in hdfsConnect
if (hdfs_builder != nullptr) {
hdfsFreeBuilder(hdfs_builder);
}
+#endif
}
// Must call this to init hdfs_builder first.
@@ -51,7 +53,7 @@ public:
Status run_kinit();
private:
- hdfsBuilder* hdfs_builder;
+ hdfsBuilder* hdfs_builder = nullptr;
bool need_kinit {false};
std::string hdfs_kerberos_keytab;
std::string hdfs_kerberos_principal;
diff --git a/be/src/io/hdfs_file_reader.cpp b/be/src/io/hdfs_file_reader.cpp
index 7117ec0e7c..7e9394d707 100644
--- a/be/src/io/hdfs_file_reader.cpp
+++ b/be/src/io/hdfs_file_reader.cpp
@@ -14,11 +14,13 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+
#include "io/hdfs_file_reader.h"
#include <sys/stat.h>
#include <unistd.h>
+#include "io/fs/err_utils.h"
#include "service/backend_options.h"
namespace doris {
@@ -100,12 +102,12 @@ Status HdfsFileReader::open() {
if (_hdfs_fs == nullptr) {
return Status::InternalError(
"open file failed. (BE: {}) namenode:{}, path:{}, err:
{}",
- BackendOptions::get_localhost(), _namenode, _path,
hdfsGetLastError());
+ BackendOptions::get_localhost(), _namenode, _path,
io::hdfs_error());
}
} else {
return Status::InternalError("open file failed. (BE: {})
namenode:{}, path:{}, err: {}",
BackendOptions::get_localhost(),
_namenode, _path,
- hdfsGetLastError());
+ io::hdfs_error());
}
}
VLOG_NOTICE << "open file, namenode:" << _namenode << ", path:" << _path;
@@ -174,7 +176,7 @@ Status HdfsFileReader::readat(int64_t position, int64_t
nbytes, int64_t* bytes_r
if (loop_read < 0) {
return Status::InternalError(
"Read hdfs file failed. (BE: {}) namenode:{}, path:{},
err: {}",
- BackendOptions::get_localhost(), _namenode, _path,
hdfsGetLastError());
+ BackendOptions::get_localhost(), _namenode, _path,
io::hdfs_error());
}
if (loop_read == 0) {
break;
@@ -192,7 +194,7 @@ int64_t HdfsFileReader::size() {
hdfsFileInfo* file_info = hdfsGetPathInfo(_hdfs_fs, _path.c_str());
if (file_info == nullptr) {
return Status::IOError("failed to get path info, path: {},
error: {}", _path,
- hdfsGetLastError());
+ io::hdfs_error());
}
_file_size = file_info->mSize;
hdfsFreeFileInfo(file_info, 1);
@@ -205,7 +207,7 @@ Status HdfsFileReader::seek(int64_t position) {
int res = hdfsSeek(_hdfs_fs, _hdfs_file, position);
if (res != 0) {
return Status::InternalError("Seek to offset failed. (BE: {})
offset={}, err: {}",
- BackendOptions::get_localhost(),
position, hdfsGetLastError());
+ BackendOptions::get_localhost(),
position, io::hdfs_error());
}
_current_offset = position;
return Status::OK();
@@ -223,7 +225,7 @@ Status HdfsFsCache::_create_fs(THdfsParams& hdfs_params,
hdfsFS* fs) {
RETURN_IF_ERROR(createHDFSBuilder(hdfs_params, &builder));
hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get());
if (hdfs_fs == nullptr) {
- return Status::InternalError("connect to hdfs failed. error: {}",
hdfsGetLastError());
+ return Status::InternalError("connect to hdfs failed. error: {}",
io::hdfs_error());
}
*fs = hdfs_fs;
return Status::OK();
diff --git a/be/src/io/hdfs_writer.cpp b/be/src/io/hdfs_writer.cpp
index e6814438f1..69554d82a9 100644
--- a/be/src/io/hdfs_writer.cpp
+++ b/be/src/io/hdfs_writer.cpp
@@ -17,10 +17,14 @@
#include "io/hdfs_writer.h"
+#include <bthread/bthread.h>
+
#include <filesystem>
#include "common/logging.h"
+#include "io/fs/err_utils.h"
#include "service/backend_options.h"
+#include "util/stack_util.h"
namespace doris {
@@ -56,16 +60,18 @@ Status HDFSWriter::open() {
std::filesystem::path hdfs_path(_path);
std::string hdfs_dir = hdfs_path.parent_path().string();
+ LOG(INFO) << "hdfs write open: " << hdfs_dir << get_stack_trace();
exists = hdfsExists(_hdfs_fs, hdfs_dir.c_str());
if (exists != 0) {
- VLOG_NOTICE << "hdfs dir doesn't exist, create it: " << hdfs_dir;
+ LOG(INFO) << "hdfs dir doesn't exist, create it: " << hdfs_dir << ",
path: " << _path
+ << get_stack_trace();
int ret = hdfsCreateDirectory(_hdfs_fs, hdfs_dir.c_str());
if (ret != 0) {
std::stringstream ss;
ss << "create dir failed. "
<< "(BE: " << BackendOptions::get_localhost() << ")"
<< " namenode: " << _namenode << " path: " << hdfs_dir
- << ", err: " << hdfsGetLastError();
+ << ", err: " << io::hdfs_error();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
@@ -76,7 +82,7 @@ Status HDFSWriter::open() {
std::stringstream ss;
ss << "open file failed. "
<< "(BE: " << BackendOptions::get_localhost() << ")"
- << " namenode:" << _namenode << " path:" << _path << ", err: " <<
hdfsGetLastError();
+ << " namenode:" << _namenode << " path:" << _path << ", err: " <<
io::hdfs_error();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
@@ -94,7 +100,7 @@ Status HDFSWriter::write(const uint8_t* buf, size_t buf_len,
size_t* written_len
std::stringstream ss;
ss << "write file failed. "
<< "(BE: " << BackendOptions::get_localhost() << ")"
- << "namenode:" << _namenode << " path:" << _path << ", err: " <<
hdfsGetLastError();
+ << "namenode:" << _namenode << " path:" << _path << ", err: " <<
io::hdfs_error();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
@@ -121,7 +127,7 @@ Status HDFSWriter::close() {
std::stringstream ss;
ss << "failed to flush hdfs file. "
<< "(BE: " << BackendOptions::get_localhost() << ")"
- << "namenode:" << _namenode << " path:" << _path << ", err: " <<
hdfsGetLastError();
+ << "namenode:" << _namenode << " path:" << _path << ", err: " <<
io::hdfs_error();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
@@ -135,11 +141,12 @@ Status HDFSWriter::close() {
Status HDFSWriter::_connect() {
HDFSCommonBuilder builder;
- RETURN_IF_ERROR(createHDFSBuilder(_properties, &builder));
+ THdfsParams hdfsParams = parse_properties(_properties);
+ RETURN_IF_ERROR(createHDFSBuilder(hdfsParams, &builder));
_hdfs_fs = hdfsBuilderConnect(builder.get());
if (_hdfs_fs == nullptr) {
return Status::InternalError("connect to hdfs failed. namenode
address:{}, error {}",
- _namenode, hdfsGetLastError());
+ _namenode, io::hdfs_error());
}
return Status::OK();
}
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 1f658da09c..6db5a55668 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -61,7 +61,15 @@ using namespace ErrorCode;
const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3;
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(add_batch_task_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_active_threads,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_active_threads,
MetricUnit::NOUNIT);
+
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_max_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT);
bthread_key_t btls_key;
@@ -95,16 +103,42 @@ private:
PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
: _exec_env(exec_env),
- _tablet_worker_pool(config::number_tablet_writer_threads, 10240,
"tablet_writer"),
-
_slave_replica_worker_pool(config::number_slave_replica_download_threads, 10240,
- "replica_download") {
- REGISTER_HOOK_METRIC(add_batch_task_queue_size,
- [this]() { return
_tablet_worker_pool.get_queue_size(); });
+ _heavy_work_pool(config::brpc_heavy_work_pool_threads,
+ config::brpc_heavy_work_pool_max_queue_size,
"brpc_heavy"),
+ _light_work_pool(config::brpc_light_work_pool_threads,
+ config::brpc_light_work_pool_max_queue_size,
"brpc_light") {
+ REGISTER_HOOK_METRIC(heavy_work_pool_queue_size,
+ [this]() { return _heavy_work_pool.get_queue_size();
});
+ REGISTER_HOOK_METRIC(light_work_pool_queue_size,
+ [this]() { return _light_work_pool.get_queue_size();
});
+ REGISTER_HOOK_METRIC(heavy_work_active_threads,
+ [this]() { return
_heavy_work_pool.get_active_threads(); });
+ REGISTER_HOOK_METRIC(light_work_active_threads,
+ [this]() { return
_light_work_pool.get_active_threads(); });
+
+ REGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size,
+ []() { return
config::brpc_heavy_work_pool_max_queue_size; });
+ REGISTER_HOOK_METRIC(light_work_pool_max_queue_size,
+ []() { return
config::brpc_light_work_pool_max_queue_size; });
+ REGISTER_HOOK_METRIC(heavy_work_max_threads,
+ []() { return config::brpc_heavy_work_pool_threads;
});
+ REGISTER_HOOK_METRIC(light_work_max_threads,
+ []() { return config::brpc_light_work_pool_threads;
});
+
CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter));
}
PInternalServiceImpl::~PInternalServiceImpl() {
- DEREGISTER_HOOK_METRIC(add_batch_task_queue_size);
+ DEREGISTER_HOOK_METRIC(heavy_work_pool_queue_size);
+ DEREGISTER_HOOK_METRIC(light_work_pool_queue_size);
+ DEREGISTER_HOOK_METRIC(heavy_work_active_threads);
+ DEREGISTER_HOOK_METRIC(light_work_active_threads);
+
+ DEREGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size);
+ DEREGISTER_HOOK_METRIC(light_work_pool_max_queue_size);
+ DEREGISTER_HOOK_METRIC(heavy_work_max_threads);
+ DEREGISTER_HOOK_METRIC(light_work_max_threads);
+
CHECK_EQ(0, bthread_key_delete(btls_key));
}
@@ -132,7 +166,7 @@ void
PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController
_transmit_data(cntl_base, new_request, response, new_done, st);
}
-void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController*
cntl_base,
+void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController*
controller,
const PTransmitDataParams* request,
PTransmitDataResult* response,
google::protobuf::Closure* done,
@@ -170,22 +204,31 @@ void
PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c
const PTabletWriterOpenRequest*
request,
PTabletWriterOpenResult*
response,
google::protobuf::Closure* done)
{
- VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" <<
request->index_id()
- << ", txn_id=" << request->txn_id();
- brpc::ClosureGuard closure_guard(done);
- auto st = _exec_env->load_channel_mgr()->open(*request);
- if (!st.ok()) {
- LOG(WARNING) << "load channel open failed, message=" << st << ", id="
<< request->id()
- << ", index_id=" << request->index_id() << ", txn_id=" <<
request->txn_id();
+ bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+ VLOG_RPC << "tablet writer open, id=" << request->id()
+ << ", index_id=" << request->index_id() << ", txn_id=" <<
request->txn_id();
+ brpc::ClosureGuard closure_guard(done);
+ auto st = _exec_env->load_channel_mgr()->open(*request);
+ if (!st.ok()) {
+ LOG(WARNING) << "load channel open failed, message=" << st << ",
id=" << request->id()
+ << ", index_id=" << request->index_id()
+ << ", txn_id=" << request->txn_id();
+ }
+ st.to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
- st.to_protobuf(response->mutable_status());
}
-void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController*
cntl_base,
+void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController*
controller,
const PExecPlanFragmentRequest*
request,
PExecPlanFragmentResult*
response,
google::protobuf::Closure* done)
{
- auto span = telemetry::start_rpc_server_span("exec_plan_fragment",
cntl_base);
+ auto span = telemetry::start_rpc_server_span("exec_plan_fragment",
controller);
auto scope = OpentelemetryScope {span};
brpc::ClosureGuard closure_guard(done);
auto st = Status::OK();
@@ -199,67 +242,95 @@ void
PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c
st.to_protobuf(response->mutable_status());
}
-void
PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController*
cntl_base,
+void
PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController*
controller,
const
PExecPlanFragmentRequest* request,
PExecPlanFragmentResult*
response,
google::protobuf::Closure* done) {
- exec_plan_fragment(cntl_base, request, response, done);
+ bool ret = _light_work_pool.try_offer([this, controller, request,
response, done]() {
+ exec_plan_fragment(controller, request, response, done);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
}
void
PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcController*
controller,
const
PExecPlanFragmentStartRequest* request,
PExecPlanFragmentResult*
result,
google::protobuf::Closure*
done) {
- auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start",
controller);
- auto scope = OpentelemetryScope {span};
- brpc::ClosureGuard closure_guard(done);
- auto st = _exec_env->fragment_mgr()->start_query_execution(request);
- st.to_protobuf(result->mutable_status());
+ bool ret = _light_work_pool.try_offer([this, controller, request, result,
done]() {
+ auto span =
telemetry::start_rpc_server_span("exec_plan_fragment_start", controller);
+ auto scope = OpentelemetryScope {span};
+ brpc::ClosureGuard closure_guard(done);
+ auto st = _exec_env->fragment_mgr()->start_query_execution(request);
+ st.to_protobuf(result->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ result->mutable_status()->add_error_msgs("fail to offer request to the
work pool");
+ }
}
-void
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController*
cntl_base,
+void
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController*
controller,
const
PTabletWriterAddBlockRequest* request,
PTabletWriterAddBlockResult* response,
google::protobuf::Closure*
done) {
- // TODO(zxy) delete in 1.2 version
- google::protobuf::Closure* new_done = new
NewHttpClosure<PTransmitDataParams>(done);
- brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
- attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request,
cntl);
+ bool ret = _heavy_work_pool.try_offer([this, controller, request,
response, done]() {
+ // TODO(zxy) delete in 1.2 version
+ google::protobuf::Closure* new_done = new
NewHttpClosure<PTransmitDataParams>(done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+
attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl);
- _tablet_writer_add_block(cntl_base, request, response, new_done);
+ _tablet_writer_add_block(controller, request, response, new_done);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
}
void PInternalServiceImpl::tablet_writer_add_block_by_http(
- google::protobuf::RpcController* cntl_base, const
::doris::PEmptyRequest* request,
+ google::protobuf::RpcController* controller, const
::doris::PEmptyRequest* request,
PTabletWriterAddBlockResult* response, google::protobuf::Closure*
done) {
- PTabletWriterAddBlockRequest* new_request = new
PTabletWriterAddBlockRequest();
- google::protobuf::Closure* new_done =
- new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request,
done);
- brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
- Status st =
attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(new_request,
-
cntl);
- if (st.ok()) {
- _tablet_writer_add_block(cntl_base, new_request, response, new_done);
- } else {
- st.to_protobuf(response->mutable_status());
+ bool ret = _heavy_work_pool.try_offer([this, controller, response, done]()
{
+ PTabletWriterAddBlockRequest* new_request = new
PTabletWriterAddBlockRequest();
+ google::protobuf::Closure* new_done =
+ new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request,
done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+ Status st =
attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(
+ new_request, cntl);
+ if (st.ok()) {
+ _tablet_writer_add_block(controller, new_request, response,
new_done);
+ } else {
+ st.to_protobuf(response->mutable_status());
+ }
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
}
-void
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController*
cntl_base,
+void
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController*
controller,
const
PTabletWriterAddBlockRequest* request,
PTabletWriterAddBlockResult* response,
google::protobuf::Closure*
done) {
- VLOG_RPC << "tablet writer add block, id=" << request->id()
- << ", index_id=" << request->index_id() << ", sender_id=" <<
request->sender_id()
- << ", current_queued_size=" <<
_tablet_worker_pool.get_queue_size();
int64_t submit_task_time_ns = MonotonicNanos();
- _tablet_worker_pool.offer([request, response, done, submit_task_time_ns,
this]() {
+ bool ret = _heavy_work_pool.try_offer([request, response, done,
submit_task_time_ns, this]() {
int64_t wait_execution_time_ns = MonotonicNanos() -
submit_task_time_ns;
brpc::ClosureGuard closure_guard(done);
int64_t execution_time_ns = 0;
{
SCOPED_RAW_TIMER(&execution_time_ns);
-
auto st = _exec_env->load_channel_mgr()->add_batch(*request,
response);
if (!st.ok()) {
LOG(WARNING) << "tablet writer add block failed, message=" <<
st
@@ -272,6 +343,12 @@ void
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl
response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO);
response->set_wait_execution_time_us(wait_execution_time_ns /
NANOS_PER_MICRO);
});
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
}
void
PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcController*
cntl_base,
@@ -303,13 +380,13 @@ void
PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl
PTabletWriterAddBatchResult* response,
google::protobuf::Closure*
done) {
VLOG_RPC << "tablet writer add batch, id=" << request->id()
- << ", index_id=" << request->index_id() << ", sender_id=" <<
request->sender_id()
- << ", current_queued_size=" <<
_tablet_worker_pool.get_queue_size();
+ << ", index_id=" << request->index_id() << ", sender_id=" <<
request->sender_id();
// add batch maybe cost a lot of time, and this callback thread will be
held.
// this will influence query execution, because the pthreads under bthread
may be
// exhausted, so we put this to a local thread pool to process
int64_t submit_task_time_ns = MonotonicNanos();
- _tablet_worker_pool.offer([cntl_base, request, response, done,
submit_task_time_ns, this]() {
+ bool ret = _heavy_work_pool.offer([cntl_base, request, response, done,
submit_task_time_ns,
+ this]() {
int64_t wait_execution_time_ns = MonotonicNanos() -
submit_task_time_ns;
brpc::ClosureGuard closure_guard(done);
int64_t execution_time_ns = 0;
@@ -332,20 +409,32 @@ void
PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl
response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO);
response->set_wait_execution_time_us(wait_execution_time_ns /
NANOS_PER_MICRO);
});
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
}
void
PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController*
controller,
const
PTabletWriterCancelRequest* request,
PTabletWriterCancelResult*
response,
google::protobuf::Closure*
done) {
- VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id="
<< request->index_id()
- << ", sender_id=" << request->sender_id();
- brpc::ClosureGuard closure_guard(done);
- auto st = _exec_env->load_channel_mgr()->cancel(*request);
- if (!st.ok()) {
- LOG(WARNING) << "tablet writer cancel failed, id=" << request->id()
- << ", index_id=" << request->index_id()
- << ", sender_id=" << request->sender_id();
+ bool ret = _light_work_pool.try_offer([this, request, done]() {
+ VLOG_RPC << "tablet writer cancel, id=" << request->id()
+ << ", index_id=" << request->index_id() << ", sender_id=" <<
request->sender_id();
+ brpc::ClosureGuard closure_guard(done);
+ auto st = _exec_env->load_channel_mgr()->cancel(*request);
+ if (!st.ok()) {
+ LOG(WARNING) << "tablet writer cancel failed, id=" << request->id()
+ << ", index_id=" << request->index_id()
+ << ", sender_id=" << request->sender_id();
+ }
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
}
}
@@ -377,313 +466,408 @@ Status PInternalServiceImpl::_exec_plan_fragment(const
std::string& ser_request,
}
}
-void
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController*
cntl_base,
+void
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController*
controller,
const
PCancelPlanFragmentRequest* request,
PCancelPlanFragmentResult*
result,
google::protobuf::Closure*
done) {
- auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start",
cntl_base);
- auto scope = OpentelemetryScope {span};
- brpc::ClosureGuard closure_guard(done);
- TUniqueId tid;
- tid.__set_hi(request->finst_id().hi());
- tid.__set_lo(request->finst_id().lo());
-
- Status st;
- if (request->has_cancel_reason()) {
- LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid)
- << ", reason: " << request->cancel_reason();
- st = _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
- } else {
- LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid);
- st = _exec_env->fragment_mgr()->cancel(tid);
- }
- if (!st.ok()) {
- LOG(WARNING) << "cancel plan fragment failed, errmsg=" << st;
+ bool ret = _light_work_pool.try_offer([this, controller, request, result,
done]() {
+ auto span =
telemetry::start_rpc_server_span("exec_plan_fragment_start", controller);
+ auto scope = OpentelemetryScope {span};
+ brpc::ClosureGuard closure_guard(done);
+ TUniqueId tid;
+ tid.__set_hi(request->finst_id().hi());
+ tid.__set_lo(request->finst_id().lo());
+
+ Status st = Status::OK();
+ if (request->has_cancel_reason()) {
+ LOG(INFO) << "cancel fragment, fragment_instance_id=" <<
print_id(tid)
+ << ", reason: " << request->cancel_reason();
+ _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
+ } else {
+ LOG(INFO) << "cancel fragment, fragment_instance_id=" <<
print_id(tid);
+ _exec_env->fragment_mgr()->cancel(tid);
+ }
+ // TODO: the logic seems useless, cancel only return Status::OK.
remove it
+ st.to_protobuf(result->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ result->mutable_status()->add_error_msgs("fail to offer request to the
work pool");
}
- st.to_protobuf(result->mutable_status());
}
-void PInternalServiceImpl::fetch_data(google::protobuf::RpcController*
cntl_base,
+void PInternalServiceImpl::fetch_data(google::protobuf::RpcController*
controller,
const PFetchDataRequest* request,
PFetchDataResult* result,
google::protobuf::Closure* done) {
- brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
- GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
- _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
+ bool ret = _heavy_work_pool.try_offer([this, controller, request, result,
done]() {
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+ GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
+ _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ result->mutable_status()->add_error_msgs("fail to offer request to the
work pool");
+ }
}
void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController*
controller,
const PFetchTableSchemaRequest*
request,
PFetchTableSchemaResult* result,
google::protobuf::Closure* done)
{
- VLOG_RPC << "fetch table schema";
- brpc::ClosureGuard closure_guard(done);
- TFileScanRange file_scan_range;
- Status st = Status::OK();
- {
- const uint8_t* buf = (const
uint8_t*)(request->file_scan_range().data());
- uint32_t len = request->file_scan_range().size();
- st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
+ bool ret = _heavy_work_pool.try_offer([request, result, done]() {
+ VLOG_RPC << "fetch table schema";
+ brpc::ClosureGuard closure_guard(done);
+ TFileScanRange file_scan_range;
+ Status st = Status::OK();
+ {
+ const uint8_t* buf = (const
uint8_t*)(request->file_scan_range().data());
+ uint32_t len = request->file_scan_range().size();
+ st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
+ if (!st.ok()) {
+ LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ }
+ if (file_scan_range.__isset.ranges == false) {
+ st = Status::InternalError("can not get TFileRangeDesc.");
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ if (file_scan_range.__isset.params == false) {
+ st = Status::InternalError("can not get TFileScanRangeParams.");
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ const TFileRangeDesc& range = file_scan_range.ranges.at(0);
+ const TFileScanRangeParams& params = file_scan_range.params;
+
+ std::unique_ptr<vectorized::GenericReader> reader(nullptr);
+ std::unique_ptr<RuntimeProfile> profile(new
RuntimeProfile("FetchTableSchema"));
+ switch (params.format_type) {
+ case TFileFormatType::FORMAT_CSV_PLAIN:
+ case TFileFormatType::FORMAT_CSV_GZ:
+ case TFileFormatType::FORMAT_CSV_BZ2:
+ case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+ case TFileFormatType::FORMAT_CSV_LZOP:
+ case TFileFormatType::FORMAT_CSV_DEFLATE: {
+ // file_slots is no use
+ std::vector<SlotDescriptor*> file_slots;
+ reader.reset(new vectorized::CsvReader(profile.get(), params,
range, file_slots));
+ break;
+ }
+ case TFileFormatType::FORMAT_PARQUET: {
+ reader.reset(new vectorized::ParquetReader(params, range));
+ break;
+ }
+ case TFileFormatType::FORMAT_ORC: {
+ std::vector<std::string> column_names;
+ reader.reset(new vectorized::OrcReader(params, range,
column_names, ""));
+ break;
+ }
+ case TFileFormatType::FORMAT_JSON: {
+ std::vector<SlotDescriptor*> file_slots;
+ reader.reset(new vectorized::NewJsonReader(profile.get(), params,
range, file_slots));
+ break;
+ }
+ default:
+ st = Status::InternalError("Not supported file format in fetch
table schema: {}",
+ params.format_type);
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ std::vector<std::string> col_names;
+ std::vector<TypeDescriptor> col_types;
+ st = reader->get_parsed_schema(&col_names, &col_types);
if (!st.ok()) {
LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
st.to_protobuf(result->mutable_status());
return;
}
- }
- if (file_scan_range.__isset.ranges == false) {
- st = Status::InternalError("can not get TFileRangeDesc.");
- st.to_protobuf(result->mutable_status());
- return;
- }
- if (file_scan_range.__isset.params == false) {
- st = Status::InternalError("can not get TFileScanRangeParams.");
- st.to_protobuf(result->mutable_status());
- return;
- }
- const TFileRangeDesc& range = file_scan_range.ranges.at(0);
- const TFileScanRangeParams& params = file_scan_range.params;
-
- std::unique_ptr<vectorized::GenericReader> reader(nullptr);
- std::unique_ptr<RuntimeProfile> profile(new
RuntimeProfile("FetchTableSchema"));
- switch (params.format_type) {
- case TFileFormatType::FORMAT_CSV_PLAIN:
- case TFileFormatType::FORMAT_CSV_GZ:
- case TFileFormatType::FORMAT_CSV_BZ2:
- case TFileFormatType::FORMAT_CSV_LZ4FRAME:
- case TFileFormatType::FORMAT_CSV_LZOP:
- case TFileFormatType::FORMAT_CSV_DEFLATE: {
- // file_slots is no use
- std::vector<SlotDescriptor*> file_slots;
- reader.reset(new vectorized::CsvReader(profile.get(), params, range,
file_slots));
- break;
- }
- case TFileFormatType::FORMAT_PARQUET: {
- reader.reset(new vectorized::ParquetReader(params, range));
- break;
- }
- case TFileFormatType::FORMAT_ORC: {
- std::vector<std::string> column_names;
- reader.reset(new vectorized::OrcReader(params, range, column_names,
""));
- break;
- }
- case TFileFormatType::FORMAT_JSON: {
- std::vector<SlotDescriptor*> file_slots;
- reader.reset(new vectorized::NewJsonReader(profile.get(), params,
range, file_slots));
- break;
- }
- default:
- st = Status::InternalError("Not supported file format in fetch table
schema: {}",
- params.format_type);
- st.to_protobuf(result->mutable_status());
- return;
- }
- std::vector<std::string> col_names;
- std::vector<TypeDescriptor> col_types;
- st = reader->get_parsed_schema(&col_names, &col_types);
- if (!st.ok()) {
- LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
+ result->set_column_nums(col_names.size());
+ for (size_t idx = 0; idx < col_names.size(); ++idx) {
+ result->add_column_names(col_names[idx]);
+ }
+ for (size_t idx = 0; idx < col_types.size(); ++idx) {
+ PTypeDesc* type_desc = result->add_column_types();
+ col_types[idx].to_protobuf(type_desc);
+ }
st.to_protobuf(result->mutable_status());
- return;
- }
- result->set_column_nums(col_names.size());
- for (size_t idx = 0; idx < col_names.size(); ++idx) {
- result->add_column_names(col_names[idx]);
- }
- for (size_t idx = 0; idx < col_types.size(); ++idx) {
- PTypeDesc* type_desc = result->add_column_types();
- col_types[idx].to_protobuf(type_desc);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ result->mutable_status()->add_error_msgs("fail to offer request to the
work pool");
}
- st.to_protobuf(result->mutable_status());
}
void PInternalServiceImpl::get_info(google::protobuf::RpcController*
controller,
const PProxyRequest* request,
PProxyResult* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- // PProxyRequest is defined in gensrc/proto/internal_service.proto
- // Currently it supports 2 kinds of requests:
- // 1. get all kafka partition ids for given topic
- // 2. get all kafka partition offsets for given topic and timestamp.
- if (request->has_kafka_meta_request()) {
- const PKafkaMetaProxyRequest& kafka_request =
request->kafka_meta_request();
- if (!kafka_request.partition_id_for_latest_offsets().empty()) {
- // get latest offsets for specified partition ids
- std::vector<PIntegerPair> partition_offsets;
- Status st = _exec_env->routine_load_task_executor()
- ->get_kafka_latest_offsets_for_partitions(
- request->kafka_meta_request(),
&partition_offsets);
- if (st.ok()) {
- PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
- for (const auto& entry : partition_offsets) {
- PIntegerPair* res = part_offsets->add_offset_times();
- res->set_key(entry.key());
- res->set_val(entry.val());
+ bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ // PProxyRequest is defined in gensrc/proto/internal_service.proto
+ // Currently it supports 2 kinds of requests:
+ // 1. get all kafka partition ids for given topic
+ // 2. get all kafka partition offsets for given topic and timestamp.
+ if (request->has_kafka_meta_request()) {
+ const PKafkaMetaProxyRequest& kafka_request =
request->kafka_meta_request();
+ if (!kafka_request.partition_id_for_latest_offsets().empty()) {
+ // get latest offsets for specified partition ids
+ std::vector<PIntegerPair> partition_offsets;
+ Status st = _exec_env->routine_load_task_executor()
+ ->get_kafka_latest_offsets_for_partitions(
+ request->kafka_meta_request(),
&partition_offsets);
+ if (st.ok()) {
+ PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
+ for (const auto& entry : partition_offsets) {
+ PIntegerPair* res = part_offsets->add_offset_times();
+ res->set_key(entry.key());
+ res->set_val(entry.val());
+ }
}
- }
- st.to_protobuf(response->mutable_status());
- return;
- } else if (!kafka_request.offset_times().empty()) {
- // if offset_times() has elements, which means this request is to
get offset by timestamp.
- std::vector<PIntegerPair> partition_offsets;
- Status st =
-
_exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
- request->kafka_meta_request(), &partition_offsets);
- if (st.ok()) {
- PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
- for (const auto& entry : partition_offsets) {
- PIntegerPair* res = part_offsets->add_offset_times();
- res->set_key(entry.key());
- res->set_val(entry.val());
+ st.to_protobuf(response->mutable_status());
+ return;
+ } else if (!kafka_request.offset_times().empty()) {
+ // if offset_times() has elements, which means this request is
to get offset by timestamp.
+ std::vector<PIntegerPair> partition_offsets;
+ Status st = _exec_env->routine_load_task_executor()
+ ->get_kafka_partition_offsets_for_times(
+ request->kafka_meta_request(),
&partition_offsets);
+ if (st.ok()) {
+ PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
+ for (const auto& entry : partition_offsets) {
+ PIntegerPair* res = part_offsets->add_offset_times();
+ res->set_key(entry.key());
+ res->set_val(entry.val());
+ }
}
- }
- st.to_protobuf(response->mutable_status());
- return;
- } else {
- // get partition ids of topic
- std::vector<int32_t> partition_ids;
- Status st =
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
- request->kafka_meta_request(), &partition_ids);
- if (st.ok()) {
- PKafkaMetaProxyResult* kafka_result =
response->mutable_kafka_meta_result();
- for (int32_t id : partition_ids) {
- kafka_result->add_partition_ids(id);
+ st.to_protobuf(response->mutable_status());
+ return;
+ } else {
+ // get partition ids of topic
+ std::vector<int32_t> partition_ids;
+ Status st =
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
+ request->kafka_meta_request(), &partition_ids);
+ if (st.ok()) {
+ PKafkaMetaProxyResult* kafka_result =
response->mutable_kafka_meta_result();
+ for (int32_t id : partition_ids) {
+ kafka_result->add_partition_ids(id);
+ }
}
+ st.to_protobuf(response->mutable_status());
+ return;
}
- st.to_protobuf(response->mutable_status());
- return;
}
+ Status::OK().to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
- Status::OK().to_protobuf(response->mutable_status());
}
void PInternalServiceImpl::update_cache(google::protobuf::RpcController*
controller,
const PUpdateCacheRequest* request,
PCacheResponse* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- _exec_env->result_cache()->update(request, response);
+ bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ _exec_env->result_cache()->update(request, response);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->set_status(PCacheStatus::CANCELED);
+ }
}
void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController*
controller,
const PFetchCacheRequest* request,
PFetchCacheResult* result,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- _exec_env->result_cache()->fetch(request, result);
+ bool ret = _heavy_work_pool.try_offer([this, request, result, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ _exec_env->result_cache()->fetch(request, result);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ result->set_status(PCacheStatus::CANCELED);
+ }
}
void PInternalServiceImpl::clear_cache(google::protobuf::RpcController*
controller,
const PClearCacheRequest* request,
PCacheResponse* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- _exec_env->result_cache()->clear(request, response);
+ bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ _exec_env->result_cache()->clear(request, response);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->set_status(PCacheStatus::CANCELED);
+ }
}
void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController*
controller,
const ::doris::PMergeFilterRequest*
request,
::doris::PMergeFilterResponse*
response,
::google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- auto attachment =
static_cast<brpc::Controller*>(controller)->request_attachment();
- butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
- Status st = _exec_env->fragment_mgr()->merge_filter(request,
&zero_copy_input_stream);
- if (!st.ok()) {
- LOG(WARNING) << "merge meet error" << st.to_string();
+ bool ret = _light_work_pool.try_offer([this, controller, request,
response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ auto attachment =
static_cast<brpc::Controller*>(controller)->request_attachment();
+ butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+ Status st = _exec_env->fragment_mgr()->merge_filter(request,
&zero_copy_input_stream);
+ if (!st.ok()) {
+ LOG(WARNING) << "merge meet error" << st.to_string();
+ }
+ st.to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
- st.to_protobuf(response->mutable_status());
}
void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController*
controller,
const ::doris::PPublishFilterRequest*
request,
::doris::PPublishFilterResponse*
response,
::google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- auto attachment =
static_cast<brpc::Controller*>(controller)->request_attachment();
- butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
- UniqueId unique_id(request->query_id());
- VLOG_NOTICE << "rpc apply_filter recv";
- Status st = _exec_env->fragment_mgr()->apply_filter(request,
&zero_copy_input_stream);
- if (!st.ok()) {
- LOG(WARNING) << "apply filter meet error: " << st.to_string();
+ bool ret = _light_work_pool.try_offer([this, controller, request,
response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ auto attachment =
static_cast<brpc::Controller*>(controller)->request_attachment();
+ butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+ UniqueId unique_id(request->query_id());
+ VLOG_NOTICE << "rpc apply_filter recv";
+ Status st = _exec_env->fragment_mgr()->apply_filter(request,
&zero_copy_input_stream);
+ if (!st.ok()) {
+ LOG(WARNING) << "apply filter meet error: " << st.to_string();
+ }
+ st.to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
- st.to_protobuf(response->mutable_status());
}
void PInternalServiceImpl::send_data(google::protobuf::RpcController*
controller,
const PSendDataRequest* request,
PSendDataResult* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- TUniqueId fragment_instance_id;
- fragment_instance_id.hi = request->fragment_instance_id().hi();
- fragment_instance_id.lo = request->fragment_instance_id().lo();
- auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
- if (pipe == nullptr) {
- response->mutable_status()->set_status_code(1);
- response->mutable_status()->add_error_msgs("pipe is null");
- } else {
- for (int i = 0; i < request->data_size(); ++i) {
- std::unique_ptr<PDataRow> row(new PDataRow());
- row->CopyFrom(request->data(i));
- Status s = pipe->append(std::move(row));
- if (!s.ok()) {
- response->mutable_status()->set_status_code(1);
- response->mutable_status()->add_error_msgs(s.to_string());
- return;
+ bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ TUniqueId fragment_instance_id;
+ fragment_instance_id.hi = request->fragment_instance_id().hi();
+ fragment_instance_id.lo = request->fragment_instance_id().lo();
+
+ auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+ if (pipe == nullptr) {
+ response->mutable_status()->set_status_code(1);
+ response->mutable_status()->add_error_msgs("pipe is null");
+ } else {
+ for (int i = 0; i < request->data_size(); ++i) {
+ std::unique_ptr<PDataRow> row(new PDataRow());
+ row->CopyFrom(request->data(i));
+ Status s = pipe->append(std::move(row));
+ if (!s.ok()) {
+ response->mutable_status()->set_status_code(1);
+ response->mutable_status()->add_error_msgs(s.to_string());
+ return;
+ }
}
+ response->mutable_status()->set_status_code(0);
}
- response->mutable_status()->set_status_code(0);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
}
void PInternalServiceImpl::commit(google::protobuf::RpcController* controller,
const PCommitRequest* request,
PCommitResult* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- TUniqueId fragment_instance_id;
- fragment_instance_id.hi = request->fragment_instance_id().hi();
- fragment_instance_id.lo = request->fragment_instance_id().lo();
- auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
- if (pipe == nullptr) {
- response->mutable_status()->set_status_code(1);
- response->mutable_status()->add_error_msgs("pipe is null");
- } else {
- pipe->finish();
- response->mutable_status()->set_status_code(0);
+ bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ TUniqueId fragment_instance_id;
+ fragment_instance_id.hi = request->fragment_instance_id().hi();
+ fragment_instance_id.lo = request->fragment_instance_id().lo();
+
+ auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+ if (pipe == nullptr) {
+ response->mutable_status()->set_status_code(1);
+ response->mutable_status()->add_error_msgs("pipe is null");
+ } else {
+ pipe->finish();
+ response->mutable_status()->set_status_code(0);
+ }
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
}
void PInternalServiceImpl::rollback(google::protobuf::RpcController*
controller,
const PRollbackRequest* request,
PRollbackResult* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- TUniqueId fragment_instance_id;
- fragment_instance_id.hi = request->fragment_instance_id().hi();
- fragment_instance_id.lo = request->fragment_instance_id().lo();
- auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
- if (pipe == nullptr) {
- response->mutable_status()->set_status_code(1);
- response->mutable_status()->add_error_msgs("pipe is null");
- } else {
- pipe->cancel("rollback");
- response->mutable_status()->set_status_code(0);
+ bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ TUniqueId fragment_instance_id;
+ fragment_instance_id.hi = request->fragment_instance_id().hi();
+ fragment_instance_id.lo = request->fragment_instance_id().lo();
+
+ auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+ if (pipe == nullptr) {
+ response->mutable_status()->set_status_code(1);
+ response->mutable_status()->add_error_msgs("pipe is null");
+ } else {
+ pipe->cancel("rollback");
+ response->mutable_status()->set_status_code(0);
+ }
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
}
-void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController*
cntl_base,
+void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController*
controller,
const PConstantExprRequest*
request,
PConstantExprResult* response,
google::protobuf::Closure* done)
{
- brpc::ClosureGuard closure_guard(done);
- brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-
- Status st = Status::OK();
- if (request->has_request()) {
+ bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ Status st = Status::OK();
st = _fold_constant_expr(request->request(), response);
- } else {
- // TODO(yangzhengguo) this is just for compatible with old version,
this should be removed in the release 0.15
- st = _fold_constant_expr(cntl->request_attachment().to_string(),
response);
- }
- if (!st.ok()) {
- LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st;
+ if (!st.ok()) {
+ LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st;
+ }
+ st.to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
- st.to_protobuf(response->mutable_status());
}
Status PInternalServiceImpl::_fold_constant_expr(const std::string&
ser_request,
@@ -700,31 +884,48 @@ Status PInternalServiceImpl::_fold_constant_expr(const
std::string& ser_request,
return FoldConstantExecutor().fold_constant_vexpr(t_request, response);
}
-void PInternalServiceImpl::transmit_block(google::protobuf::RpcController*
cntl_base,
+void PInternalServiceImpl::transmit_block(google::protobuf::RpcController*
controller,
const PTransmitDataParams* request,
PTransmitDataResult* response,
google::protobuf::Closure* done) {
- // TODO(zxy) delete in 1.2 version
- google::protobuf::Closure* new_done = new
NewHttpClosure<PTransmitDataParams>(done);
- brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
- attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
+ bool ret = _heavy_work_pool.try_offer([this, controller, request,
response, done]() {
+ // TODO(zxy) delete in 1.2 version
+ google::protobuf::Closure* new_done = new
NewHttpClosure<PTransmitDataParams>(done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+ attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
- _transmit_block(cntl_base, request, response, new_done, Status::OK());
+ _transmit_block(controller, request, response, new_done, Status::OK());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
}
-void
PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController*
cntl_base,
+void
PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController*
controller,
const PEmptyRequest* request,
PTransmitDataResult*
response,
google::protobuf::Closure*
done) {
- PTransmitDataParams* new_request = new PTransmitDataParams();
- google::protobuf::Closure* new_done =
- new NewHttpClosure<PTransmitDataParams>(new_request, done);
- brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
- Status st =
attachment_extract_request_contain_block<PTransmitDataParams>(new_request,
cntl);
- _transmit_block(cntl_base, new_request, response, new_done, st);
+ bool ret = _heavy_work_pool.try_offer([this, controller, response, done]()
{
+ PTransmitDataParams* new_request = new PTransmitDataParams();
+ google::protobuf::Closure* new_done =
+ new NewHttpClosure<PTransmitDataParams>(new_request, done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+ Status st =
+
attachment_extract_request_contain_block<PTransmitDataParams>(new_request,
cntl);
+ _transmit_block(controller, new_request, response, new_done, st);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
}
-void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController*
cntl_base,
+void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController*
controller,
const PTransmitDataParams* request,
PTransmitDataResult* response,
google::protobuf::Closure* done,
@@ -762,25 +963,34 @@ void
PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co
const PCheckRPCChannelRequest*
request,
PCheckRPCChannelResponse*
response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(0);
- if (request->data().size() != request->size()) {
- std::stringstream ss;
- ss << "data size not same, expected: " << request->size()
- << ", actual: " << request->data().size();
- response->mutable_status()->add_error_msgs(ss.str());
- response->mutable_status()->set_status_code(1);
-
- } else {
- Md5Digest digest;
- digest.update(static_cast<const void*>(request->data().c_str()),
request->data().size());
- digest.digest();
- if (!iequal(digest.hex(), request->md5())) {
+ bool ret = _light_work_pool.try_offer([request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(0);
+ if (request->data().size() != request->size()) {
std::stringstream ss;
- ss << "md5 not same, expected: " << request->md5() << ", actual: "
<< digest.hex();
+ ss << "data size not same, expected: " << request->size()
+ << ", actual: " << request->data().size();
response->mutable_status()->add_error_msgs(ss.str());
response->mutable_status()->set_status_code(1);
+
+ } else {
+ Md5Digest digest;
+ digest.update(static_cast<const void*>(request->data().c_str()),
+ request->data().size());
+ digest.digest();
+ if (!iequal(digest.hex(), request->md5())) {
+ std::stringstream ss;
+ ss << "md5 not same, expected: " << request->md5() << ",
actual: " << digest.hex();
+ response->mutable_status()->add_error_msgs(ss.str());
+ response->mutable_status()->set_status_code(1);
+ }
}
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
}
@@ -788,44 +998,60 @@ void
PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co
const PResetRPCChannelRequest*
request,
PResetRPCChannelResponse*
response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(0);
- if (request->all()) {
- int size =
ExecEnv::GetInstance()->brpc_internal_client_cache()->size();
- if (size > 0) {
- std::vector<std::string> endpoints;
-
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints);
- ExecEnv::GetInstance()->brpc_internal_client_cache()->clear();
- *response->mutable_channels() = {endpoints.begin(),
endpoints.end()};
- }
- } else {
- for (const std::string& endpoint : request->endpoints()) {
- if
(!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) {
- response->mutable_status()->add_error_msgs(endpoint + ": not
found.");
- continue;
+ bool ret = _light_work_pool.try_offer([request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(0);
+ if (request->all()) {
+ int size =
ExecEnv::GetInstance()->brpc_internal_client_cache()->size();
+ if (size > 0) {
+ std::vector<std::string> endpoints;
+
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints);
+ ExecEnv::GetInstance()->brpc_internal_client_cache()->clear();
+ *response->mutable_channels() = {endpoints.begin(),
endpoints.end()};
}
+ } else {
+ for (const std::string& endpoint : request->endpoints()) {
+ if
(!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) {
+ response->mutable_status()->add_error_msgs(endpoint + ":
not found.");
+ continue;
+ }
- if
(ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) {
- response->add_channels(endpoint);
- } else {
- response->mutable_status()->add_error_msgs(endpoint + ": reset
failed.");
+ if
(ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) {
+ response->add_channels(endpoint);
+ } else {
+ response->mutable_status()->add_error_msgs(endpoint + ":
reset failed.");
+ }
+ }
+ if (request->endpoints_size() != response->channels_size()) {
+ response->mutable_status()->set_status_code(1);
}
}
- if (request->endpoints_size() != response->channels_size()) {
- response->mutable_status()->set_status_code(1);
- }
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
}
-void PInternalServiceImpl::hand_shake(google::protobuf::RpcController*
cntl_base,
+void PInternalServiceImpl::hand_shake(google::protobuf::RpcController*
controller,
const PHandShakeRequest* request,
PHandShakeResponse* response,
google::protobuf::Closure* done) {
- brpc::ClosureGuard closure_guard(done);
- if (request->has_hello()) {
- response->set_hello(request->hello());
+ bool ret = _light_work_pool.try_offer([request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ if (request->has_hello()) {
+ response->set_hello(request->hello());
+ }
+ response->mutable_status()->set_status_code(0);
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
}
- response->mutable_status()->set_status_code(0);
}
void PInternalServiceImpl::request_slave_tablet_pull_rowset(
@@ -840,7 +1066,8 @@ void
PInternalServiceImpl::request_slave_tablet_pull_rowset(
int64_t brpc_port = request->brpc_port();
std::string token = request->token();
int64_t node_id = request->node_id();
- _slave_replica_worker_pool.offer([=]() {
+ bool ret = _heavy_work_pool.try_offer([rowset_meta_pb, host, brpc_port,
node_id, segments_size,
+ http_port, token, rowset_path,
this]() {
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
rowset_meta_pb.tablet_id(),
rowset_meta_pb.tablet_schema_hash());
if (tablet == nullptr) {
@@ -981,6 +1208,12 @@ void
PInternalServiceImpl::request_slave_tablet_pull_rowset(
_response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
rowset_meta->tablet_id(), node_id, true);
});
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
Status::OK().to_protobuf(response->mutable_status());
}
@@ -1039,14 +1272,22 @@ void
PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote
void PInternalServiceImpl::response_slave_tablet_pull_rowset(
google::protobuf::RpcController* controller, const
PTabletWriteSlaveDoneRequest* request,
PTabletWriteSlaveDoneResult* response, google::protobuf::Closure*
done) {
- brpc::ClosureGuard closure_guard(done);
- VLOG_CRITICAL
- << "receive the result of slave replica pull rowset from slave
replica. slave server="
- << request->node_id() << ", is_succeed=" << request->is_succeed()
- << ", tablet_id=" << request->tablet_id() << ", txn_id=" <<
request->txn_id();
- StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset(
- request->txn_id(), request->tablet_id(), request->node_id(),
request->is_succeed());
- Status::OK().to_protobuf(response->mutable_status());
+ bool ret = _heavy_work_pool.try_offer([request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ VLOG_CRITICAL << "receive the result of slave replica pull rowset from
slave replica. "
+ "slave server="
+ << request->node_id() << ", is_succeed=" <<
request->is_succeed()
+ << ", tablet_id=" << request->tablet_id() << ", txn_id="
<< request->txn_id();
+
StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset(
+ request->txn_id(), request->tablet_id(), request->node_id(),
request->is_succeed());
+ Status::OK().to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
}
} // namespace doris
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index 3ea3655974..e5855d98f3 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -194,8 +194,13 @@ private:
private:
ExecEnv* _exec_env;
- PriorityThreadPool _tablet_worker_pool;
- PriorityThreadPool _slave_replica_worker_pool;
+
+ // every brpc service request should put into thread pool
+ // the reason see issue #16634
+ // define the interface for reading and writing data as heavy interface
+ // otherwise as light interface
+ PriorityThreadPool _heavy_work_pool;
+ PriorityThreadPool _light_work_pool;
};
} // namespace doris
diff --git a/be/src/util/blocking_priority_queue.hpp
b/be/src/util/blocking_priority_queue.hpp
index 29060613c8..197b709718 100644
--- a/be/src/util/blocking_priority_queue.hpp
+++ b/be/src/util/blocking_priority_queue.hpp
@@ -137,6 +137,18 @@ public:
return true;
}
+ // Return false if queue full or has been shutdown.
+ bool try_put(const T& val) {
+ std::unique_lock<std::mutex> unique_lock(_lock);
+ if (_queue.size() < _max_element && !_shutdown) {
+ _queue.push(val);
+ unique_lock.unlock();
+ _get_cv.notify_one();
+ return true;
+ }
+ return false;
+ }
+
// Shut down the queue. Wakes up all threads waiting on blocking_get or
blocking_put.
void shutdown() {
{
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 10bcd28393..786a5625d9 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -213,6 +213,16 @@ public:
IntCounter* upload_rowset_count;
IntCounter* upload_fail_count;
+ UIntGauge* light_work_pool_queue_size;
+ UIntGauge* heavy_work_pool_queue_size;
+ UIntGauge* heavy_work_active_threads;
+ UIntGauge* light_work_active_threads;
+
+ UIntGauge* heavy_work_pool_max_queue_size;
+ UIntGauge* light_work_pool_max_queue_size;
+ UIntGauge* heavy_work_max_threads;
+ UIntGauge* light_work_max_threads;
+
static DorisMetrics* instance() {
static DorisMetrics instance;
return &instance;
diff --git a/be/src/util/hdfs_storage_backend.cpp
b/be/src/util/hdfs_storage_backend.cpp
index 6f3baf2d96..083f7e3c21 100644
--- a/be/src/util/hdfs_storage_backend.cpp
+++ b/be/src/util/hdfs_storage_backend.cpp
@@ -17,6 +17,7 @@
#include "util/hdfs_storage_backend.h"
+#include "io/fs/err_utils.h"
#include "io/hdfs_file_reader.h"
#include "io/hdfs_reader_writer.h"
#include "io/hdfs_writer.h"
@@ -125,7 +126,7 @@ Status HDFSStorageBackend::list(const std::string&
remote_path, bool contain_md5
std::string normal_str = parse_path(remote_path);
int exists = hdfsExists(_hdfs_fs, normal_str.c_str());
if (exists != 0) {
- LOG(INFO) << "path does not exist: " << normal_str << ", err: " <<
strerror(errno);
+ LOG(INFO) << "path does not exist: " << normal_str << ", err: " <<
io::hdfs_error();
return Status::OK();
}
@@ -134,7 +135,7 @@ Status HDFSStorageBackend::list(const std::string&
remote_path, bool contain_md5
if (files_info == nullptr) {
std::stringstream ss;
ss << "failed to list files from remote path: " << normal_str
- << ", err: " << strerror(errno);
+ << ", err: " << io::hdfs_error();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
diff --git a/be/src/util/hdfs_storage_backend.h
b/be/src/util/hdfs_storage_backend.h
index acbf18d2d0..9f1b27f6ac 100644
--- a/be/src/util/hdfs_storage_backend.h
+++ b/be/src/util/hdfs_storage_backend.h
@@ -17,8 +17,7 @@
#pragma once
-#include <hdfs/hdfs.h>
-
+#include "io/fs/hdfs.h"
#include "io/hdfs_builder.h"
#include "util/storage_backend.h"
diff --git a/be/src/util/hdfs_util.cpp b/be/src/util/hdfs_util.cpp
index b58fc75e46..e72b08caef 100644
--- a/be/src/util/hdfs_util.cpp
+++ b/be/src/util/hdfs_util.cpp
@@ -21,6 +21,7 @@
#include "common/config.h"
#include "common/logging.h"
+#include "io/fs/err_utils.h"
namespace doris {
@@ -33,7 +34,7 @@ hdfsFS HDFSHandle::create_hdfs_fs(HDFSCommonBuilder&
hdfs_builder) {
hdfsFS hdfs_fs = hdfsBuilderConnect(hdfs_builder.get());
if (hdfs_fs == nullptr) {
LOG(WARNING) << "connect to hdfs failed."
- << ", error: " << hdfsGetLastError();
+ << ", error: " << io::hdfs_error();
return nullptr;
}
return hdfs_fs;
diff --git a/be/src/util/hdfs_util.h b/be/src/util/hdfs_util.h
index f7bfc14b3a..a872cfe89d 100644
--- a/be/src/util/hdfs_util.h
+++ b/be/src/util/hdfs_util.h
@@ -17,13 +17,12 @@
#pragma once
-#include <hdfs/hdfs.h>
-
#include <map>
#include <memory>
#include <string>
#include "common/status.h"
+#include "io/fs/hdfs.h"
#include "io/hdfs_builder.h"
namespace doris {
@@ -40,4 +39,4 @@ private:
HDFSHandle() {}
};
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp
index 64a8e09bec..01558cb2a9 100644
--- a/be/src/util/jni-util.cpp
+++ b/be/src/util/jni-util.cpp
@@ -24,6 +24,8 @@
#include <cstdlib>
#include <filesystem>
#include <sstream>
+#include <string>
+#include <vector>
#include "common/config.h"
#include "gutil/once.h"
@@ -37,10 +39,10 @@ namespace doris {
namespace {
JavaVM* g_vm;
-GoogleOnceType g_vm_once = GOOGLE_ONCE_INIT;
+[[maybe_unused]] std::once_flag g_vm_once;
const std::string GetDorisJNIClasspath() {
- const auto* classpath = getenv("DORIS_JNI_CLASSPATH_PARAMETER");
+ const auto* classpath = getenv("DORIS_CLASSPATH");
if (classpath) {
return classpath;
} else {
@@ -66,84 +68,50 @@ const std::string GetDorisJNIClasspath() {
}
}
-void FindOrCreateJavaVM() {
+// Only used on non-x86 platform
+[[maybe_unused]] void FindOrCreateJavaVM() {
int num_vms;
- int rv = LibJVMLoader::JNI_GetCreatedJavaVMs(&g_vm, 1, &num_vms);
+ int rv = JNI_GetCreatedJavaVMs(&g_vm, 1, &num_vms);
if (rv == 0) {
- JavaVMOption* options;
- auto classpath = GetDorisJNIClasspath();
- // The following 4 opts are default opts,
- // they can be override by JAVA_OPTS env var.
- std::string heap_size = fmt::format("-Xmx{}",
config::jvm_max_heap_size);
- std::string log_path = fmt::format("-DlogPath={}/log/udf-jdbc.log",
getenv("DORIS_HOME"));
- std::string critical_jni = "-XX:-CriticalJNINatives";
- std::string max_fd_limit = "-XX:-MaxFDLimit";
+ std::vector<std::string> options;
char* java_opts = getenv("JAVA_OPTS");
- int no_args;
if (java_opts == nullptr) {
- no_args = 4; // classpath, heapsize, log path, critical
+ options = {
+ GetDorisJNIClasspath(), fmt::format("-Xmx{}", "1g"),
+ fmt::format("-DlogPath={}/log/jni.log",
getenv("DORIS_HOME")),
+ fmt::format("-Dsun.java.command={}", "DorisBE"),
"-XX:-CriticalJNINatives",
#ifdef __APPLE__
- no_args++; // -XX:-MaxFDLimit
-#endif
- options = (JavaVMOption*)calloc(no_args, sizeof(JavaVMOption));
- options[0].optionString = const_cast<char*>(classpath.c_str());
- options[1].optionString = const_cast<char*>(heap_size.c_str());
- options[2].optionString = const_cast<char*>(log_path.c_str());
- options[3].optionString = const_cast<char*>(critical_jni.c_str());
-#ifdef __APPLE__
- // On macOS, we should disable MaxFDLimit, otherwise the
RLIMIT_NOFILE
- // will be assigned the minimum of OPEN_MAX (10240) and rlim_cur
(See src/hotspot/os/bsd/os_bsd.cpp)
- // and it can not pass the check performed by storage engine.
- // The newer JDK has fixed this issue.
- options[4].optionString = const_cast<char*>(max_fd_limit.c_str());
+ // On macOS, we should disable MaxFDLimit, otherwise the
RLIMIT_NOFILE
+ // will be assigned the minimum of OPEN_MAX (10240) and
rlim_cur (See src/hotspot/os/bsd/os_bsd.cpp)
+ // and it can not pass the check performed by storage
engine.
+ // The newer JDK has fixed this issue.
+ "-XX:-MaxFDLimit"
#endif
+ };
} else {
- // user specified opts
- // 1. find the number of args
- java_opts = strdup(java_opts);
- char *str, *token, *save_ptr;
- char jvm_arg_delims[] = " ";
- for (no_args = 1, str = java_opts;; no_args++, str = nullptr) {
- token = strtok_r(str, jvm_arg_delims, &save_ptr);
- if (token == nullptr) {
- break;
- }
- }
- free(java_opts);
- // 2. set args
- options = (JavaVMOption*)calloc(no_args, sizeof(JavaVMOption));
- options[0].optionString = const_cast<char*>(classpath.c_str());
- java_opts = getenv("JAVA_OPTS");
- if (java_opts != NULL) {
- java_opts = strdup(java_opts);
- for (no_args = 1, str = java_opts;; no_args++, str = nullptr) {
- token = strtok_r(str, jvm_arg_delims, &save_ptr);
- if (token == nullptr) {
- break;
- }
- options[no_args].optionString = token;
- }
- }
+ std::istringstream stream(java_opts);
+ options =
std::vector<std::string>(std::istream_iterator<std::string> {stream},
+
std::istream_iterator<std::string>());
+ options.push_back(GetDorisJNIClasspath());
+ }
+ std::unique_ptr<JavaVMOption[]> jvm_options(new
JavaVMOption[options.size()]);
+ for (int i = 0; i < options.size(); ++i) {
+ jvm_options[i] = {const_cast<char*>(options[i].c_str()), nullptr};
}
JNIEnv* env;
JavaVMInitArgs vm_args;
vm_args.version = JNI_VERSION_1_8;
- vm_args.options = options;
- vm_args.nOptions = no_args;
+ vm_args.options = jvm_options.get();
+ vm_args.nOptions = options.size();
// Set it to JNI_FALSE because JNI_TRUE will let JVM ignore the max
size config.
vm_args.ignoreUnrecognized = JNI_FALSE;
- jint res = LibJVMLoader::JNI_CreateJavaVM(&g_vm, (void**)&env,
&vm_args);
+ jint res = JNI_CreateJavaVM(&g_vm, (void**)&env, &vm_args);
if (JNI_OK != res) {
DCHECK(false) << "Failed to create JVM, code= " << res;
}
-
- if (java_opts != nullptr) {
- free(java_opts);
- }
- free(options);
} else {
CHECK_EQ(rv, 0) << "Could not find any created Java VM";
CHECK_EQ(num_vms, 1) << "No VMs returned";
@@ -196,7 +164,8 @@ Status JniLocalFrame::push(JNIEnv* env, int max_local_ref) {
Status JniUtil::GetJNIEnvSlowPath(JNIEnv** env) {
DCHECK(!tls_env_) << "Call GetJNIEnv() fast path";
- GoogleOnceInit(&g_vm_once, &FindOrCreateJavaVM);
+#ifdef USE_LIBHDFS3
+ std::call_once(g_vm_once, FindOrCreateJavaVM);
int rc = g_vm->GetEnv(reinterpret_cast<void**>(&tls_env_),
JNI_VERSION_1_8);
if (rc == JNI_EDETACHED) {
rc = g_vm->AttachCurrentThread((void**)&tls_env_, nullptr);
@@ -204,6 +173,10 @@ Status JniUtil::GetJNIEnvSlowPath(JNIEnv** env) {
if (rc != 0 || tls_env_ == nullptr) {
return Status::InternalError("Unable to get JVM: {}", rc);
}
+#else
+ // the hadoop libhdfs will do all the stuff
+ tls_env_ = getJNIEnv();
+#endif
*env = tls_env_;
return Status::OK();
}
diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h
index 0e551f17cf..dfef2d3be7 100644
--- a/be/src/util/jni-util.h
+++ b/be/src/util/jni-util.h
@@ -23,6 +23,11 @@
#include "gutil/macros.h"
#include "util/thrift_util.h"
+#ifdef USE_HADOOP_HDFS
+// defined in hadoop_hdfs/hdfs.h
+extern "C" JNIEnv* getJNIEnv(void);
+#endif
+
namespace doris {
#define RETURN_ERROR_IF_EXC(env) \
diff --git a/be/src/util/libjvm_loader.cpp b/be/src/util/libjvm_loader.cpp
index 127d28c2de..6175da6081 100644
--- a/be/src/util/libjvm_loader.cpp
+++ b/be/src/util/libjvm_loader.cpp
@@ -25,6 +25,15 @@
#include "common/status.h"
+_JNI_IMPORT_OR_EXPORT_ jint JNICALL JNI_GetCreatedJavaVMs(JavaVM** vm_buf,
jsize bufLen,
+ jsize* numVMs) {
+ return doris::LibJVMLoader::JNI_GetCreatedJavaVMs(vm_buf, bufLen, numVMs);
+}
+
+_JNI_IMPORT_OR_EXPORT_ jint JNICALL JNI_CreateJavaVM(JavaVM** pvm, void**
penv, void* args) {
+ return doris::LibJVMLoader::JNI_CreateJavaVM(pvm, penv, args);
+}
+
namespace {
#ifndef __APPLE__
diff --git a/be/src/util/priority_thread_pool.hpp
b/be/src/util/priority_thread_pool.hpp
index 32fc637970..1c1bf8117e 100644
--- a/be/src/util/priority_thread_pool.hpp
+++ b/be/src/util/priority_thread_pool.hpp
@@ -54,7 +54,7 @@ public:
// queue exceeds this size, subsequent calls to Offer will block until
there is
// capacity available.
PriorityThreadPool(uint32_t num_threads, uint32_t queue_size, const
std::string& name)
- : _work_queue(queue_size), _shutdown(false), _name(name) {
+ : _work_queue(queue_size), _shutdown(false), _name(name),
_active_threads(0) {
for (int i = 0; i < num_threads; ++i) {
_threads.create_thread(
std::bind<void>(std::mem_fn(&PriorityThreadPool::work_thread), this, i));
@@ -86,6 +86,11 @@ public:
return _work_queue.blocking_put(task);
}
+ virtual bool try_offer(WorkFunction func) {
+ PriorityThreadPool::Task task = {0, func, 0};
+ return _work_queue.try_put(task);
+ }
+
// Shuts the thread pool down, causing the work queue to cease accepting
offered work
// and the worker threads to terminate once they have processed their
current work item.
// Returns once the shutdown flag has been set, does not wait for the
threads to
@@ -100,6 +105,7 @@ public:
virtual void join() { _threads.join_all(); }
virtual uint32_t get_queue_size() const { return _work_queue.get_size(); }
+ virtual uint32_t get_active_threads() const { return _active_threads; }
// Blocks until the work queue is empty, and then calls shutdown to stop
the worker
// threads and Join to wait until they are finished.
@@ -135,7 +141,9 @@ private:
while (!is_shutdown()) {
Task task;
if (_work_queue.blocking_get(&task)) {
+ _active_threads++;
task.work_function();
+ _active_threads--;
}
if (_work_queue.get_size() == 0) {
_empty_cv.notify_all();
@@ -150,6 +158,7 @@ private:
// Set to true when threads should stop doing work and terminate.
std::atomic<bool> _shutdown;
std::string _name;
+ std::atomic<int> _active_threads;
};
} // namespace doris
diff --git a/bin/start_be.sh b/bin/start_be.sh
index 89c40947c0..231986940e 100755
--- a/bin/start_be.sh
+++ b/bin/start_be.sh
@@ -20,7 +20,8 @@ set -eo pipefail
curdir="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"
-if [[ "$(uname -s)" == 'Darwin' ]] && command -v brew &>/dev/null; then
+MACHINE_OS=$(uname -s)
+if [[ "${MACHINE_OS}" == 'Darwin' ]] && command -v brew &>/dev/null; then
PATH="$(brew --prefix)/opt/gnu-getopt/bin:${PATH}"
export PATH
fi
@@ -70,16 +71,36 @@ if [[ "$(uname -s)" != 'Darwin' ]]; then
fi
fi
-# add libs to CLASSPATH
+# add java libs
for f in "${DORIS_HOME}/lib"/*.jar; do
- if [[ -z "${DORIS_JNI_CLASSPATH_PARAMETER}" ]]; then
- export DORIS_JNI_CLASSPATH_PARAMETER="${f}"
+ if [[ -z "${DORIS_CLASSPATH}" ]]; then
+ export DORIS_CLASSPATH="${f}"
else
- export
DORIS_JNI_CLASSPATH_PARAMETER="${f}:${DORIS_JNI_CLASSPATH_PARAMETER}"
+ export DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
fi
done
-# DORIS_JNI_CLASSPATH_PARAMETER is used to configure additional jar path to
jvm. e.g. -Djava.class.path=$DORIS_HOME/lib/java-udf.jar
-export
DORIS_JNI_CLASSPATH_PARAMETER="-Djava.class.path=${DORIS_JNI_CLASSPATH_PARAMETER}"
+
+if [[ -d "${DORIS_HOME}/lib/hadoop_hdfs/" ]]; then
+ # add hadoop libs
+ for f in "${DORIS_HOME}/lib/hadoop_hdfs/common"/*.jar; do
+ DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
+ done
+ for f in "${DORIS_HOME}/lib/hadoop_hdfs/common/lib"/*.jar; do
+ DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
+ done
+ for f in "${DORIS_HOME}/lib/hadoop_hdfs/hdfs"/*.jar; do
+ DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
+ done
+ for f in "${DORIS_HOME}/lib/hadoop_hdfs/hdfs/lib"/*.jar; do
+ DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
+ done
+fi
+
+# the CLASSPATH and LIBHDFS_OPTS is used for hadoop libhdfs
+# and conf/ dir so that hadoop libhdfs can read .xml config file in conf/
+export CLASSPATH="${DORIS_HOME}/conf/:${DORIS_CLASSPATH}"
+# DORIS_CLASSPATH is for self-managed jni
+export DORIS_CLASSPATH="-Djava.class.path=${DORIS_CLASSPATH}"
jdk_version() {
local java_cmd="${1}"
@@ -230,11 +251,28 @@ set_tcmalloc_heap_limit() {
# set_tcmalloc_heap_limit || exit 1
-## set hdfs conf
+## set hdfs3 conf
if [[ -f "${DORIS_HOME}/conf/hdfs-site.xml" ]]; then
export LIBHDFS3_CONF="${DORIS_HOME}/conf/hdfs-site.xml"
fi
+if [[ -z ${JAVA_OPTS} ]]; then
+ # set default JAVA_OPTS
+ CUR_DATE=$(date +%Y%m%d-%H%M%S)
+ JAVA_OPTS="-Xmx1024m -DlogPath=${DORIS_HOME}/log/jni.log
-Xloggc:${DORIS_HOME}/log/be.gc.log.${CUR_DATE} -Dsun.java.command=DorisBE
-XX:-CriticalJNINatives"
+fi
+
+if [[ "${MACHINE_OS}" == "Darwin" ]]; then
+ JAVA_OPTS="${JAVA_OPTS} -XX:-MaxFDLimit"
+fi
+
+# set LIBHDFS_OPTS for hadoop libhdfs
+export LIBHDFS_OPTS="${JAVA_OPTS}"
+
+#echo "CLASSPATH: ${CLASSPATH}"
+#echo "LD_LIBRARY_PATH: ${LD_LIBRARY_PATH}"
+#echo "LIBHDFS_OPTS: ${LIBHDFS_OPTS}"
+
# see
https://github.com/apache/doris/blob/master/docs/zh-CN/community/developer-guide/debug-tool.md#jemalloc-heap-profile
export
JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:30000,dirty_decay_ms:30000,oversize_threshold:0,lg_tcache_max:16,prof_prefix:jeprof.out"
diff --git a/build.sh b/build.sh
index 9eaa16d8b6..d8296e9bbf 100755
--- a/build.sh
+++ b/build.sh
@@ -539,6 +539,10 @@ if [[ "${BUILD_BE}" -eq 1 ]]; then
cp -r -p "${DORIS_HOME}/be/output/bin"/* "${DORIS_OUTPUT}/be/bin"/
cp -r -p "${DORIS_HOME}/be/output/conf"/* "${DORIS_OUTPUT}/be/conf"/
+ if [[ -d "${DORIS_THIRDPARTY}/installed/lib/hadoop_hdfs/" ]]; then
+ cp -r -p "${DORIS_THIRDPARTY}/installed/lib/hadoop_hdfs/"
"${DORIS_OUTPUT}/be/lib/"
+ fi
+
if [[ "${BUILD_JAVA_UDF}" -eq 0 ]]; then
echo -e "\033[33;1mWARNNING: \033[37;1mDisable Java UDF support in
be.conf due to the BE was built without Java UDF.\033[0m"
cat >>"${DORIS_OUTPUT}/be/conf/be.conf" <<EOF
diff --git a/conf/be.conf b/conf/be.conf
index 240e7ee920..cc1b8f6c59 100644
--- a/conf/be.conf
+++ b/conf/be.conf
@@ -17,8 +17,8 @@
PPROF_TMPDIR="$DORIS_HOME/log/"
-# if JAVA_OPTS is set, it will override the jvm opts for BE jvm.
-#JAVA_OPTS="-Xmx8192m -DlogPath=$DORIS_HOME/log/udf-jdbc.log
-Djava.compiler=NONE -XX::-CriticalJNINatives"
+DATE = `date +%Y%m%d-%H%M%S`
+JAVA_OPTS="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log
-Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Dsun.java.command=DorisBE
-XX:-CriticalJNINatives"
# since 1.2, the JAVA_HOME need to be set to run BE process.
# JAVA_HOME=/path/to/jdk/
diff --git
a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
index 610125abe3..464c59c309 100644
--- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
+++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
@@ -295,6 +295,12 @@ curl http://be_host:webserver_port/metrics?type=json
|`fragment_thread_pool_queue_size`| | Num | 当前查询执行线程池等待队列的长度 |
如果大于零,则说明查询线程已耗尽,查询会出现堆积 | P0 |
|`doris_be_all_rowsets_num`| | Num | 当前所有 rowset 的个数 | | P0 |
|`doris_be_all_segments_num`| | Num | 当前所有 segment 的个数 | | P0 |
+|`doris_be_heavy_work_max_threads`| | Num | brpc heavy线程池线程个数| | p0 |
+|`doris_be_light_work_max_threads`| | Num | brpc light线程池线程个数| | p0 |
+|`doris_be_heavy_work_pool_queue_size`| | Num | brpc
heavy线程池队列最大长度,超过则阻塞提交work| | p0 |
+|`doris_be_light_work_pool_queue_size`| | Num | brpc
light线程池队列最大长度,超过则阻塞提交work| | p0 |
+|`doris_be_heavy_work_active_threads`| | Num | brpc heavy线程池活跃线程数| | p0 |
+|`doris_be_light_work_active_threads`| | Num | brpc light线程池活跃线程数| | p0 |
### 机器监控
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
index 28ecbea7a7..44c7bc967e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
@@ -271,7 +271,7 @@ public class BeLoadRebalancer extends Rebalancer {
if (lowBackend == null) {
continue;
}
- if (hosts.contains(lowBackend.getHost())) {
+ if (!Config.allow_replica_on_same_host &&
hosts.contains(lowBackend.getHost())) {
continue;
}
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 95e1887b26..197d38dc78 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -243,6 +243,7 @@ enum PCacheStatus {
INVALID_KEY_RANGE = 6;
DATA_OVERDUE = 7;
EMPTY_DATA = 8;
+ CANCELED = 9;
};
enum CacheType {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]