This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 2658914bc6e Pick "[fix](IO) Fix async close's raii and lazily load
jni's jvm max heap value #34799" (#34815)
2658914bc6e is described below
commit 2658914bc6e66c87b1aa7cfdef8970dedc0ff74b
Author: AlexYue <[email protected]>
AuthorDate: Tue May 14 14:08:53 2024 +0800
Pick "[fix](IO) Fix async close's raii and lazily load jni's jvm max heap
value #34799" (#34815)
---
be/src/io/fs/hdfs_file_writer.cpp | 2 +-
be/src/io/fs/s3_file_writer.cpp | 2 +-
be/src/runtime/exec_env_init.cpp | 2 -
be/src/util/jni-util.cpp | 11 +++---
be/test/io/fs/hdfs_file_system_test.cpp | 66 +++++++++++++++------------------
5 files changed, 37 insertions(+), 46 deletions(-)
diff --git a/be/src/io/fs/hdfs_file_writer.cpp
b/be/src/io/fs/hdfs_file_writer.cpp
index cad3dd35dbe..01744c7bde3 100644
--- a/be/src/io/fs/hdfs_file_writer.cpp
+++ b/be/src/io/fs/hdfs_file_writer.cpp
@@ -144,7 +144,7 @@ HdfsFileWriter::HdfsFileWriter(Path path,
std::shared_ptr<HdfsHandler> handler,
HdfsFileWriter::~HdfsFileWriter() {
if (_async_close_pack != nullptr) {
// For thread safety
- std::ignore = _async_close_pack->promise.get_future();
+ std::ignore = _async_close_pack->future.get();
_async_close_pack = nullptr;
}
if (_hdfs_file) {
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 644440923de..852b5258e85 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -106,7 +106,7 @@
S3FileWriter::S3FileWriter(std::shared_ptr<Aws::S3::S3Client> client, std::strin
S3FileWriter::~S3FileWriter() {
if (_async_close_pack != nullptr) {
// For thread safety
- std::ignore = _async_close_pack->promise.get_future();
+ std::ignore = _async_close_pack->future.get();
_async_close_pack = nullptr;
}
// We won't do S3 abort operation in BE, we let s3 service do it own.
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index ac45fa3d136..a181e607767 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -155,8 +155,6 @@ static void init_doris_metrics(const
std::vector<StorePath>& store_paths) {
DorisMetrics::instance()->initialize(init_system_metrics, disk_devices,
network_interfaces);
}
-
-
ThreadPool* ExecEnv::non_block_close_thread_pool() {
#ifdef BE_TEST
return get_non_block_close_thread_pool();
diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp
index 4ef0ffc7acb..3c8f2b0a30f 100644
--- a/be/src/util/jni-util.cpp
+++ b/be/src/util/jni-util.cpp
@@ -208,13 +208,14 @@ Status JniLocalFrame::push(JNIEnv* env, int
max_local_ref) {
}
void JniUtil::parse_max_heap_memory_size_from_jvm(JNIEnv* env) {
- // The start_be.sh would set JAVA_OPTS
- std::string java_opts = getenv("JAVA_OPTS") ? getenv("JAVA_OPTS") : "";
+ // The start_be.sh would set JAVA_OPTS inside LIBHDFS_OPTS
+ std::string java_opts = getenv("LIBHDFS_OPTS") ? getenv("LIBHDFS_OPTS") :
"";
std::istringstream iss(java_opts);
std::string opt;
while (iss >> opt) {
if (opt.find("-Xmx") == 0) {
std::string xmxValue = opt.substr(4);
+ LOG(INFO) << "The max heap vaule is " << xmxValue;
char unit = xmxValue.back();
xmxValue.pop_back();
long long value = std::stoll(xmxValue);
@@ -247,6 +248,9 @@ size_t JniUtil::get_max_jni_heap_memory_size() {
#if defined(USE_LIBHDFS3) || defined(BE_TEST)
return std::numeric_limits<size_t>::max();
#else
+ static std::once_flag parse_max_heap_memory_size_from_jvm_flag;
+ std::call_once(parse_max_heap_memory_size_from_jvm_flag,
parse_max_heap_memory_size_from_jvm,
+ tls_env_);
return max_jvm_heap_memory_size_;
#endif
}
@@ -267,9 +271,6 @@ Status JniUtil::GetJNIEnvSlowPath(JNIEnv** env) {
// the hadoop libhdfs will do all the stuff
SetEnvIfNecessary();
tls_env_ = getJNIEnv();
- if (nullptr != tls_env_) {
- parse_max_heap_memory_size_from_jvm(tls_env_);
- }
#endif
*env = tls_env_;
return Status::OK();
diff --git a/be/test/io/fs/hdfs_file_system_test.cpp
b/be/test/io/fs/hdfs_file_system_test.cpp
index 51a308f45ed..b741a19bf79 100644
--- a/be/test/io/fs/hdfs_file_system_test.cpp
+++ b/be/test/io/fs/hdfs_file_system_test.cpp
@@ -43,43 +43,35 @@ TEST(HdfsFileSystemTest, Write) {
st = local_fs->create_file(fmt::format("{}/mock_hdfs_file", test_dir),
&local_file_writer);
ASSERT_TRUE(st.ok()) << st;
- sp->set_call_back(
- "HdfsFileWriter::close::hdfsHSync",
- [](auto&& args) {
- auto* ret = try_any_cast_ret<int>(args);
- ret->first = 0; // noop, return success
- ret->second = true;
- });
-
- sp->set_call_back(
- "HdfsFileWriter::close::hdfsCloseFile",
- [&](auto&& args) {
- auto st = local_file_writer->close();
- ASSERT_TRUE(st.ok()) << st;
- auto* ret = try_any_cast_ret<int>(args);
- ret->first = 0; // return success
- ret->second = true;
- });
-
- sp->set_call_back(
- "HdfsFileWriter::append_hdfs_file::hdfsWrite",
- [&](auto&& args) {
- auto content = try_any_cast<std::string_view>(args[0]);
- auto st = local_file_writer->append({content.data(),
content.size()});
- ASSERT_TRUE(st.ok()) << st;
- auto* ret = try_any_cast_ret<int>(args);
- ret->first = content.size(); // return bytes written
- ret->second = true;
- });
-
- sp->set_call_back(
- "HdfsFileWriter::finalize::hdfsFlush",
- [](auto&& args) {
- auto* ret = try_any_cast_ret<int>(args);
- ret->first = 0; // noop, return success
- ret->second = true;
- });
-
+ sp->set_call_back("HdfsFileWriter::close::hdfsHSync", [](auto&& args) {
+ auto* ret = try_any_cast_ret<int>(args);
+ ret->first = 0; // noop, return success
+ ret->second = true;
+ });
+
+ sp->set_call_back("HdfsFileWriter::close::hdfsCloseFile", [&](auto&& args)
{
+ auto st = local_file_writer->close();
+ ASSERT_TRUE(st.ok()) << st;
+ auto* ret = try_any_cast_ret<int>(args);
+ ret->first = 0; // return success
+ ret->second = true;
+ });
+
+ sp->set_call_back("HdfsFileWriter::append_hdfs_file::hdfsWrite",
[&](auto&& args) {
+ auto content = try_any_cast<std::string_view>(args[0]);
+ auto st = local_file_writer->append({content.data(), content.size()});
+ ASSERT_TRUE(st.ok()) << st;
+ auto* ret = try_any_cast_ret<int>(args);
+ ret->first = content.size(); // return bytes written
+ ret->second = true;
+ });
+
+ sp->set_call_back("HdfsFileWriter::finalize::hdfsFlush", [](auto&& args) {
+ auto* ret = try_any_cast_ret<int>(args);
+ ret->first = 0; // noop, return success
+ ret->second = true;
+ });
+
Defer defer {[&]() {
sp->clear_call_back("HdfsFileWriter::close::hdfsHSync");
sp->clear_call_back("HdfsFileWriter::close::hdfsCloseFile");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]