This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5f62a4462dd [Enhancement](wal) Add wal space back pressure (#26483)
5f62a4462dd is described below
commit 5f62a4462dd670d83cd09349a31125c21686fc71
Author: abmdocrt <[email protected]>
AuthorDate: Thu Nov 9 12:29:05 2023 +0800
[Enhancement](wal) Add wal space back pressure (#26483)
---
be/src/common/config.cpp | 7 +++++--
be/src/common/config.h | 3 +++
be/src/olap/wal_manager.cpp | 24 +++++++++++++++++++++++-
be/src/olap/wal_manager.h | 4 ++++
be/src/olap/wal_writer.cpp | 20 +++++++++++++++++++-
be/src/olap/wal_writer.h | 13 ++++++++++++-
be/test/olap/wal_manager_test.cpp | 4 +++-
be/test/olap/wal_reader_writer_test.cpp | 5 ++++-
8 files changed, 73 insertions(+), 7 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 29af4d5059a..0133a78ecf9 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1118,8 +1118,11 @@ DEFINE_Bool(ignore_always_true_predicate_for_segment,
"true");
// Dir of default timezone files
DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo");
-// Max size(bytes) of group commit queues, used for mem back pressure.
-DEFINE_Int32(group_commit_max_queue_size, "65536");
+// Max size(bytes) of group commit queues, used for mem back pressure, defult
64M.
+DEFINE_Int32(group_commit_max_queue_size, "67108864");
+
+// Max size(bytes) of wal disk using, used for disk space back pressure,
default 64M.
+DEFINE_Int32(wal_max_disk_size, "67108864");
// Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency
DEFINE_Int32(ingest_binlog_work_pool_size, "-1");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index f0fdf58558c..d9276aef9a5 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1195,6 +1195,9 @@ DECLARE_String(default_tzfiles_path);
// Max size(bytes) of group commit queues, used for mem back pressure.
DECLARE_Int32(group_commit_max_queue_size);
+// Max size(bytes) of wal disk using, used for disk space back pressure.
+DECLARE_Int32(wal_max_disk_size);
+
// Ingest binlog work pool size
DECLARE_Int32(ingest_binlog_work_pool_size);
diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp
index cf84c4a9661..921f7da8e38 100644
--- a/be/src/olap/wal_manager.cpp
+++ b/be/src/olap/wal_manager.cpp
@@ -19,10 +19,15 @@
#include <thrift/protocol/TDebugProtocol.h>
+#include <atomic>
#include <chrono>
+#include <cstdint>
#include <filesystem>
+#include <memory>
+#include <utility>
#include "io/fs/local_file_system.h"
+#include "olap/wal_writer.h"
#include "runtime/client_cache.h"
#include "runtime/fragment_mgr.h"
#include "runtime/plan_fragment_executor.h"
@@ -35,11 +40,13 @@ namespace doris {
WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list)
: _exec_env(exec_env), _stop_background_threads_latch(1) {
doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs);
+ _all_wal_disk_bytes = std::make_shared<std::atomic_size_t>(0);
}
WalManager::~WalManager() {
LOG(INFO) << "WalManager is destoried";
}
+
void WalManager::stop() {
_stop = true;
_stop_background_threads_latch.count_down();
@@ -117,8 +124,12 @@ Status WalManager::create_wal_writer(int64_t wal_id,
std::shared_ptr<WalWriter>&
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(base_path));
}
LOG(INFO) << "create wal " << wal_path;
- wal_writer = std::make_shared<WalWriter>(wal_path);
+ wal_writer = std::make_shared<WalWriter>(wal_path, _all_wal_disk_bytes);
RETURN_IF_ERROR(wal_writer->init());
+ {
+ std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
+ _wal_id_to_writer_map.emplace(wal_id, wal_writer);
+ }
return Status::OK();
}
@@ -241,6 +252,17 @@ size_t WalManager::get_wal_table_size(const std::string&
table_id) {
Status WalManager::delete_wal(int64_t wal_id) {
{
std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
+ if (_wal_id_to_writer_map.find(wal_id) != _wal_id_to_writer_map.end())
{
+ _all_wal_disk_bytes->store(
+
_all_wal_disk_bytes->fetch_sub(_wal_id_to_writer_map[wal_id]->disk_bytes(),
+ std::memory_order_relaxed),
+ std::memory_order_relaxed);
+ _wal_id_to_writer_map[wal_id]->cv.notify_one();
+ _wal_id_to_writer_map.erase(wal_id);
+ }
+ if (_wal_id_to_writer_map.empty()) {
+ CHECK_EQ(_all_wal_disk_bytes->load(std::memory_order_relaxed), 0);
+ }
std::string wal_path = _wal_path_map[wal_id];
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path));
LOG(INFO) << "delete file=" << wal_path;
diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h
index f5b49f6ddaf..d8fa4a70527 100644
--- a/be/src/olap/wal_manager.h
+++ b/be/src/olap/wal_manager.h
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#include <memory>
+
#include "common/config.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/FrontendService_types.h"
@@ -56,6 +58,8 @@ private:
std::vector<std::string> _wal_dirs;
std::shared_mutex _wal_lock;
std::unordered_map<int64_t, std::string> _wal_path_map;
+ std::unordered_map<int64_t, std::shared_ptr<WalWriter>>
_wal_id_to_writer_map;
+ std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes;
bool _stop = false;
};
} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_writer.cpp b/be/src/olap/wal_writer.cpp
index 7cd427453bb..085c0f0e31f 100644
--- a/be/src/olap/wal_writer.cpp
+++ b/be/src/olap/wal_writer.cpp
@@ -17,6 +17,9 @@
#include "olap/wal_writer.h"
+#include <atomic>
+
+#include "common/config.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
@@ -25,7 +28,12 @@
namespace doris {
-WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name) {}
+WalWriter::WalWriter(const std::string& file_name,
+ const std::shared_ptr<std::atomic_size_t>&
all_wal_disk_bytes)
+ : _file_name(file_name),
+ _count(0),
+ _disk_bytes(0),
+ _all_wal_disk_bytes(all_wal_disk_bytes) {}
WalWriter::~WalWriter() {}
@@ -44,6 +52,12 @@ Status WalWriter::finalize() {
}
Status WalWriter::append_blocks(const PBlockArray& blocks) {
+ {
+ std::unique_lock l(_mutex);
+ while (_all_wal_disk_bytes->load(std::memory_order_relaxed) >
config::wal_max_disk_size) {
+ cv.wait_for(l,
std::chrono::milliseconds(WalWriter::MAX_WAL_WRITE_WAIT_TIME));
+ }
+ }
size_t total_size = 0;
for (const auto& block : blocks) {
total_size += LENGTH_SIZE + block->ByteSizeLong() + CHECKSUM_SIZE;
@@ -62,6 +76,10 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) {
offset += CHECKSUM_SIZE;
}
DCHECK(offset == total_size);
+ _disk_bytes += total_size;
+ _all_wal_disk_bytes->store(
+ _all_wal_disk_bytes->fetch_add(total_size,
std::memory_order_relaxed),
+ std::memory_order_relaxed);
// write rows
RETURN_IF_ERROR(_file_writer->append({row_binary, offset}));
_count++;
diff --git a/be/src/olap/wal_writer.h b/be/src/olap/wal_writer.h
index 12fd84f258f..9fd0a396788 100644
--- a/be/src/olap/wal_writer.h
+++ b/be/src/olap/wal_writer.h
@@ -17,9 +17,13 @@
#pragma once
+#include <atomic>
+#include <memory>
+
#include "common/status.h"
#include "gen_cpp/internal_service.pb.h"
#include "io/fs/file_reader_writer_fwd.h"
+#include "util/lock.h"
namespace doris {
@@ -27,23 +31,30 @@ using PBlockArray = std::vector<PBlock*>;
class WalWriter {
public:
- explicit WalWriter(const std::string& file_name);
+ explicit WalWriter(const std::string& file_name,
+ const std::shared_ptr<std::atomic_size_t>&
all_wal_disk_bytes);
~WalWriter();
Status init();
Status finalize();
Status append_blocks(const PBlockArray& blocks);
+ size_t disk_bytes() const { return _disk_bytes; };
std::string file_name() { return _file_name; };
static const int64_t LENGTH_SIZE = 8;
static const int64_t CHECKSUM_SIZE = 4;
+ doris::ConditionVariable cv;
private:
+ static constexpr size_t MAX_WAL_WRITE_WAIT_TIME = 1000;
std::string _file_name;
io::FileWriterPtr _file_writer;
int64_t _count;
int64_t _batch;
+ size_t _disk_bytes;
+ std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes;
+ doris::Mutex _mutex;
};
} // namespace doris
\ No newline at end of file
diff --git a/be/test/olap/wal_manager_test.cpp
b/be/test/olap/wal_manager_test.cpp
index fbb5fdbf03f..ec387680a63 100644
--- a/be/test/olap/wal_manager_test.cpp
+++ b/be/test/olap/wal_manager_test.cpp
@@ -78,7 +78,9 @@ public:
void prepare() {
static_cast<void>(io::global_local_filesystem()->create_directory(wal_dir)); }
void createWal(const std::string& wal_path) {
- auto wal_writer = WalWriter(wal_path);
+ std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes =
+ std::make_shared<std::atomic_size_t>(0);
+ auto wal_writer = WalWriter(wal_path, _all_wal_disk_bytes);
static_cast<void>(wal_writer.init());
static_cast<void>(wal_writer.finalize());
}
diff --git a/be/test/olap/wal_reader_writer_test.cpp
b/be/test/olap/wal_reader_writer_test.cpp
index 71c822013a2..09460477e38 100644
--- a/be/test/olap/wal_reader_writer_test.cpp
+++ b/be/test/olap/wal_reader_writer_test.cpp
@@ -17,6 +17,7 @@
#include <gtest/gtest.h>
#include <filesystem>
+#include <memory>
#include "agent/be_exec_version_manager.h"
#include "common/object_pool.h"
@@ -89,7 +90,9 @@ void generate_block(PBlock& pblock, int row_index) {
TEST_F(WalReaderWriterTest, TestWriteAndRead1) {
std::string file_name = _s_test_data_path + "/abcd123.txt";
- auto wal_writer = WalWriter(file_name);
+ std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes =
+ std::make_shared<std::atomic_size_t>(0);
+ auto wal_writer = WalWriter(file_name, _all_wal_disk_bytes);
static_cast<void>(wal_writer.init());
size_t file_len = 0;
int64_t file_size = -1;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]