platoneko commented on code in PR #34354:
URL: https://github.com/apache/doris/pull/34354#discussion_r1597342914
##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -45,6 +49,68 @@ bvar::Adder<uint64_t>
hdfs_file_created_total("hdfs_file_writer_file_created");
bvar::Adder<uint64_t>
hdfs_file_being_written("hdfs_file_writer_file_being_written");
static constexpr size_t MB = 1024 * 1024;
+static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB
+
+// In practice, we've found that if the import frequency to HDFS is too fast,
+// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI.
+// For this, we should have a method to monitor how much JVM memory is
currently being used.
+// The HdfsWriteMemUsageRecorder class increments a recorded value during
hdfsWrite when writing to HDFS.
+// The HDFS client will blockingly call hdfsHsync or hdfsCloseFile
+// which ensures that the client's buffer is sent to the data node and
returned with an acknowledgment before returning to the caller.
+// HdfsWriteMemUsageRecorder would reduce the mem usage at that time.
+// If the current usage exceeds the maximum set by the user, the current mem
acquire would return failure.
+// The caller could do sleep to wait for free memory.
+class HdfsWriteMemUsageRecorder {
+public:
+ HdfsWriteMemUsageRecorder() = default;
+ ~HdfsWriteMemUsageRecorder() = default;
+ size_t max_usage() const {
+ return static_cast<size_t>(max_jvm_heap_size() *
+
config::max_hdfs_wirter_jni_heap_usage_ratio);
+ }
+ Status acquire_memory(size_t memory_size) {
+#ifdef USE_LIBHDFS3
+ return Status::OK();
+#elif BE_TEST
+ return Status::OK();
+#else
+
+ std::unique_lock lck {cur_memory_latch};
+ cv.wait_for(lck,
std::chrono::milliseconds(config::hdfs_jni_write_sleep_milliseconds),
+ [&]() { return cur_memory_comsuption + memory_size <=
max_usage(); });
+ if (cur_memory_comsuption + memory_size > max_usage()) {
+ return Status::InternalError(
+ "Run out of Jni jvm heap space, current limit size is {},
max heap size is {}, "
+ "ratio is {}",
+ max_usage(), max_jvm_heap_size(),
config::max_hdfs_wirter_jni_heap_usage_ratio);
+ }
+ cur_memory_comsuption += memory_size;
+ return Status::OK();
+#endif
+ }
+
+ void release_memory(size_t memory_size) {
+#ifdef USE_LIBHDFS3
+ return;
+#elif BE_TEST
+#else
+ std::unique_lock lck {cur_memory_latch};
+ size_t origin_size = cur_memory_comsuption;
+ cur_memory_comsuption -= memory_size;
+ if (cur_memory_comsuption <= max_usage() && origin_size > max_usage())
{
+ cv.notify_one();
Review Comment:
notify_all
##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -46,6 +49,52 @@ bvar::Adder<uint64_t>
hdfs_file_being_written("hdfs_file_writer_file_being_writt
static constexpr size_t MB = 1024 * 1024;
+// In practice, we've found that if the import frequency to HDFS is too fast,
+// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI.
+// For this, we should have a method to monitor how much JVM memory is
currently being used.
+// The HdfsWriteRateLimit class increments a recorded value during hdfsWrite
when writing to HDFS.
+// When hdfsCloseFile is called, all related memory in the JVM will be
invalidated, so the recorded value can be decreased at that time.
+// If the current usage exceeds the maximum set by the user, the current write
will sleep.
+// If the number of sleeps exceeds the number specified by the user, then the
current write is considered to have failed
+class HdfsWriteRateLimit {
+public:
+ HdfsWriteRateLimit()
+ : max_jvm_heap_size(JniUtil::get_max_jni_heap_memory_size()),
+ cur_memory_comsuption(0) {}
+ size_t max_usage() const {
+ return static_cast<size_t>(max_jvm_heap_size *
config::max_hdfs_jni_heap_usage_ratio);
+ }
+ Status do_rate_limit(size_t memory_size) {
+#ifdef USE_LIBHDFS3
+ return Status::OK();
+#endif
+ for (int retry_time = config::hsfs_jni_write_max_retry_time;
retry_time != 0;
Review Comment:
```suggestion
for (int retry_time = config::hdfs_jni_write_max_retry_time;
retry_time > 0;
```
##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -44,7 +47,85 @@ bvar::Adder<uint64_t>
hdfs_bytes_written_total("hdfs_file_writer_bytes_written")
bvar::Adder<uint64_t> hdfs_file_created_total("hdfs_file_writer_file_created");
bvar::Adder<uint64_t>
hdfs_file_being_written("hdfs_file_writer_file_being_written");
+class HdfsMaxConnectionLimiter {
+public:
+ HdfsMaxConnectionLimiter() = default;
+ ~HdfsMaxConnectionLimiter() = default;
+
+ void add_inflight_writer() {
+ std::unique_lock lck {_latch};
+ _cv.wait(lck, [&]() {
+ return _cur_inflight_writer <
config::max_inflight_hdfs_write_connection;
+ });
+ _cur_inflight_writer++;
+ }
+
+ void reduce_inflight_writer() {
+ std::unique_lock lck {_latch};
+ _cur_inflight_writer--;
+ _cv.notify_one();
+ }
+
+private:
+ std::mutex _latch;
+ std::condition_variable _cv;
+ int64_t _cur_inflight_writer {0};
+};
+
static constexpr size_t MB = 1024 * 1024;
+static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB
+
+// In practice, we've found that if the import frequency to HDFS is too fast,
+// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI.
+// For this, we should have a method to monitor how much JVM memory is
currently being used.
+// The HdfsWriteMemUsageRecorder class increments a recorded value during
hdfsWrite when writing to HDFS.
+// The HDFS client will blockingly call hdfsHsync or hdfsCloseFile
+// which ensures that the client's buffer is sent to the data node and
returned with an acknowledgment before returning to the caller.
+// HdfsWriteMemUsageRecorder would reduce the mem usage at that time.
+// If the current usage exceeds the maximum set by the user, the current mem
acquire would return failure.
+// The caller could do sleep to wait for free memory.
+class HdfsWriteMemUsageRecorder {
+public:
+ HdfsWriteMemUsageRecorder()
+ : max_jvm_heap_size(JniUtil::get_max_jni_heap_memory_size()),
cur_memory_comsuption(0) {
+ LOG_INFO("the max jvm heap size is {}", max_jvm_heap_size);
+ }
+ ~HdfsWriteMemUsageRecorder() = default;
+ size_t max_usage() const {
+ return static_cast<size_t>(max_jvm_heap_size *
+
config::max_hdfs_wirter_jni_heap_usage_ratio);
+ }
+ Status acquire_memory(size_t memory_size) {
+#ifdef USE_LIBHDFS3
+ return Status::OK();
+#elif BE_TEST
+ return Status::OK();
+#else
+ if (cur_memory_comsuption + memory_size > max_usage()) {
+ return Status::InternalError("Run out of Jni jvm heap space,
current limit size is {}",
Review Comment:
```suggestion
return Status::InternalError<false>("Run out of Jni jvm heap
space, current limit size is {}",
```
##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -71,11 +137,52 @@ HdfsFileWriter::~HdfsFileWriter() {
if (_hdfs_file) {
SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency);
hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+ _flush_and_reset_approximate_jni_buffer_size();
}
hdfs_file_being_written << -1;
}
+void HdfsFileWriter::_flush_and_reset_approximate_jni_buffer_size() {
+ g_hdfs_write_rate_limiter.release_memory(_approximate_jni_buffer_size);
+ _approximate_jni_buffer_size = 0;
+}
+
+Status HdfsFileWriter::_acquire_jni_memory(size_t size) {
+#ifdef USE_LIBHDFS3
+ return Status::OK();
+#else
+ size_t actual_size = std::max(CLIENT_WRITE_PACKET_SIZE, size);
+ if (auto st = g_hdfs_write_rate_limiter.acquire_memory(actual_size);
!st.ok()) {
+ int ret;
Review Comment:
在 HFlush 前判断下 _approximate_jni_buffer_size > 0
##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -46,6 +49,52 @@ bvar::Adder<uint64_t>
hdfs_file_being_written("hdfs_file_writer_file_being_writt
static constexpr size_t MB = 1024 * 1024;
+// In practice, we've found that if the import frequency to HDFS is too fast,
+// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI.
+// For this, we should have a method to monitor how much JVM memory is
currently being used.
+// The HdfsWriteRateLimit class increments a recorded value during hdfsWrite
when writing to HDFS.
+// When hdfsCloseFile is called, all related memory in the JVM will be
invalidated, so the recorded value can be decreased at that time.
+// If the current usage exceeds the maximum set by the user, the current write
will sleep.
+// If the number of sleeps exceeds the number specified by the user, then the
current write is considered to have failed
+class HdfsWriteRateLimit {
+public:
+ HdfsWriteRateLimit()
+ : max_jvm_heap_size(JniUtil::get_max_jni_heap_memory_size()),
+ cur_memory_comsuption(0) {}
+ size_t max_usage() const {
+ return static_cast<size_t>(max_jvm_heap_size *
config::max_hdfs_jni_heap_usage_ratio);
+ }
+ Status do_rate_limit(size_t memory_size) {
+#ifdef USE_LIBHDFS3
+ return Status::OK();
+#endif
+ for (int retry_time = config::hsfs_jni_write_max_retry_time;
retry_time != 0;
+ retry_time--) {
+ if (cur_memory_comsuption + memory_size > max_usage()) {
+ std::this_thread::sleep_for(
+
std::chrono::milliseconds(config::max_hdfs_jni_write_sleep_milliseconds));
+ continue;
+ }
+ cur_memory_comsuption.fetch_add(memory_size);
+ return Status::OK();
+ }
+ return Status::InternalError("Run out of Jni jvm heap space");
+ }
+
+ void release_jni_memory(size_t memory_size) {
+#ifdef USE_LIBHDFS3
+ return;
+#endif
+ cur_memory_comsuption.fetch_sub(memory_size);
Review Comment:
```suggestion
cur_memory_comsuption.fetch_sub(memory_size,
std::memory_order_relaxed);
```
##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -45,6 +49,68 @@ bvar::Adder<uint64_t>
hdfs_file_created_total("hdfs_file_writer_file_created");
bvar::Adder<uint64_t>
hdfs_file_being_written("hdfs_file_writer_file_being_written");
static constexpr size_t MB = 1024 * 1024;
+static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB
+
+// In practice, we've found that if the import frequency to HDFS is too fast,
+// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI.
+// For this, we should have a method to monitor how much JVM memory is
currently being used.
+// The HdfsWriteMemUsageRecorder class increments a recorded value during
hdfsWrite when writing to HDFS.
+// The HDFS client will blockingly call hdfsHsync or hdfsCloseFile
+// which ensures that the client's buffer is sent to the data node and
returned with an acknowledgment before returning to the caller.
+// HdfsWriteMemUsageRecorder would reduce the mem usage at that time.
+// If the current usage exceeds the maximum set by the user, the current mem
acquire would return failure.
+// The caller could do sleep to wait for free memory.
+class HdfsWriteMemUsageRecorder {
+public:
+ HdfsWriteMemUsageRecorder() = default;
+ ~HdfsWriteMemUsageRecorder() = default;
+ size_t max_usage() const {
+ return static_cast<size_t>(max_jvm_heap_size() *
+
config::max_hdfs_wirter_jni_heap_usage_ratio);
+ }
+ Status acquire_memory(size_t memory_size) {
+#ifdef USE_LIBHDFS3
+ return Status::OK();
+#elif BE_TEST
+ return Status::OK();
+#else
+
+ std::unique_lock lck {cur_memory_latch};
+ cv.wait_for(lck,
std::chrono::milliseconds(config::hdfs_jni_write_sleep_milliseconds),
+ [&]() { return cur_memory_comsuption + memory_size <=
max_usage(); });
+ if (cur_memory_comsuption + memory_size > max_usage()) {
+ return Status::InternalError(
Review Comment:
`lck.unlock()` before return `InternalError`
##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -45,6 +49,68 @@ bvar::Adder<uint64_t>
hdfs_file_created_total("hdfs_file_writer_file_created");
bvar::Adder<uint64_t>
hdfs_file_being_written("hdfs_file_writer_file_being_written");
static constexpr size_t MB = 1024 * 1024;
+static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB
+
+// In practice, we've found that if the import frequency to HDFS is too fast,
+// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI.
+// For this, we should have a method to monitor how much JVM memory is
currently being used.
+// The HdfsWriteMemUsageRecorder class increments a recorded value during
hdfsWrite when writing to HDFS.
+// The HDFS client will blockingly call hdfsHsync or hdfsCloseFile
+// which ensures that the client's buffer is sent to the data node and
returned with an acknowledgment before returning to the caller.
+// HdfsWriteMemUsageRecorder would reduce the mem usage at that time.
+// If the current usage exceeds the maximum set by the user, the current mem
acquire would return failure.
+// The caller could do sleep to wait for free memory.
+class HdfsWriteMemUsageRecorder {
+public:
+ HdfsWriteMemUsageRecorder() = default;
+ ~HdfsWriteMemUsageRecorder() = default;
+ size_t max_usage() const {
+ return static_cast<size_t>(max_jvm_heap_size() *
+
config::max_hdfs_wirter_jni_heap_usage_ratio);
+ }
+ Status acquire_memory(size_t memory_size) {
+#ifdef USE_LIBHDFS3
+ return Status::OK();
+#elif BE_TEST
+ return Status::OK();
+#else
+
+ std::unique_lock lck {cur_memory_latch};
+ cv.wait_for(lck,
std::chrono::milliseconds(config::hdfs_jni_write_sleep_milliseconds),
+ [&]() { return cur_memory_comsuption + memory_size <=
max_usage(); });
+ if (cur_memory_comsuption + memory_size > max_usage()) {
+ return Status::InternalError(
Review Comment:
Maybe stacktrace should be disabled here, use `InternalError<false>`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]