This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9ff8bd2e9c9 [Enhancement](Wal)Support dynamic wal space limit (#27726)
9ff8bd2e9c9 is described below
commit 9ff8bd2e9c995760b740800addd42eddfa8e25df
Author: abmdocrt <[email protected]>
AuthorDate: Wed Dec 27 11:51:32 2023 +0800
[Enhancement](Wal)Support dynamic wal space limit (#27726)
---
be/src/common/config.cpp | 7 +-
be/src/common/config.h | 5 +-
be/src/http/action/http_stream.cpp | 55 +++--
be/src/http/action/stream_load.cpp | 64 ++++--
be/src/http/utils.cpp | 7 +-
be/src/io/fs/local_file_system.cpp | 13 ++
be/src/io/fs/local_file_system.h | 1 +
be/src/olap/wal_dirs_info.cpp | 224 +++++++++++++++++++++
be/src/olap/wal_dirs_info.h | 85 ++++++++
be/src/olap/wal_manager.cpp | 165 ++++++++++++---
be/src/olap/wal_manager.h | 37 +++-
be/src/olap/wal_writer.cpp | 28 +--
be/src/olap/wal_writer.h | 10 +-
be/src/runtime/exec_env.h | 1 +
be/src/runtime/group_commit_mgr.cpp | 92 ++++++++-
be/src/runtime/group_commit_mgr.h | 24 ++-
be/src/vec/sink/group_commit_block_sink.cpp | 30 ++-
be/src/vec/sink/group_commit_block_sink.h | 2 +-
be/src/vec/sink/writer/vwal_writer.cpp | 10 +-
be/src/vec/sink/writer/vwal_writer.h | 3 +-
be/test/http/stream_load_test.cpp | 13 +-
be/test/olap/wal_manager_test.cpp | 101 +++++++++-
be/test/olap/wal_reader_writer_test.cpp | 5 +-
be/test/vec/exec/vtablet_sink_test.cpp | 2 +-
be/test/vec/exec/vwal_scanner_test.cpp | 7 +-
.../test_group_commit_and_wal_back_pressure.groovy | 6 +-
.../stream_load/test_group_commit_wal_limit.groovy | 76 +------
27 files changed, 846 insertions(+), 227 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 9a886ff19c9..358db99e592 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1099,7 +1099,7 @@ DEFINE_Int32(grace_shutdown_wait_seconds, "120");
DEFINE_Int16(bitmap_serialize_version, "1");
// group commit insert config
-DEFINE_String(group_commit_wal_path, "./wal");
+DEFINE_String(group_commit_wal_path, "");
DEFINE_Int32(group_commit_replay_wal_retry_num, "10");
DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5");
DEFINE_Int32(group_commit_relay_wal_threads, "10");
@@ -1128,8 +1128,9 @@ DEFINE_String(default_tzfiles_path,
"${DORIS_HOME}/zoneinfo");
// Max size(bytes) of group commit queues, used for mem back pressure, defult
64M.
DEFINE_Int32(group_commit_max_queue_size, "67108864");
-// Max size(bytes) of wal disk using, used for disk space back pressure,
default 64M.
-DEFINE_Int32(wal_max_disk_size, "67108864");
+// Max size(bytes) or percentage(%) of wal disk usage, used for disk space
back pressure, default 10% of the disk available space.
+// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10%
can be automatically identified.
+DEFINE_String(group_commit_wal_max_disk_limit, "10%");
// Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency
DEFINE_Int32(ingest_binlog_work_pool_size, "-1");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index bb341002902..a41a3d06141 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1203,8 +1203,9 @@ DECLARE_String(default_tzfiles_path);
// Max size(bytes) of group commit queues, used for mem back pressure.
DECLARE_Int32(group_commit_max_queue_size);
-// Max size(bytes) of wal disk using, used for disk space back pressure.
-DECLARE_Int32(wal_max_disk_size);
+// Max size(bytes) or percentage(%) of wal disk usage, used for disk space
back pressure, default 10% of the disk available space.
+// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10%
can be automatically identified.
+DECLARE_mString(group_commit_wal_max_disk_limit);
// Ingest binlog work pool size
DECLARE_Int32(ingest_binlog_work_pool_size);
diff --git a/be/src/http/action/http_stream.cpp
b/be/src/http/action/http_stream.cpp
index 8c75f6be740..18256237a81 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -46,6 +46,7 @@
#include "http/utils.h"
#include "io/fs/stream_load_pipe.h"
#include "olap/storage_engine.h"
+#include "olap/wal_manager.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
@@ -169,31 +170,38 @@ int HttpStreamAction::on_header(HttpRequest* req) {
Status st = Status::OK();
std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
- if (iequal(group_commit_mode, "off_mode")) {
- group_commit_mode = "";
- }
if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode")
&&
!iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode,
"off_mode")) {
st = Status::InternalError("group_commit can only be [async_mode,
sync_mode, off_mode]");
- if (iequal(group_commit_mode, "off_mode")) {
- group_commit_mode = "";
- }
+ } else if (group_commit_mode.empty() || iequal(group_commit_mode,
"off_mode")) {
+ // off_mode and empty
+ group_commit_mode = "off_mode";
+ ctx->group_commit = false;
+ } else {
+ // sync_mode and async_mode
+ ctx->group_commit = true;
}
ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true";
auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty();
auto partitions = !req->header(HTTP_PARTITIONS).empty();
if (!temp_partitions && !partitions && !ctx->two_phase_commit &&
(!group_commit_mode.empty() ||
config::wait_internal_group_commit_finish)) {
- if (config::wait_internal_group_commit_finish) {
+ if (iequal(group_commit_mode, "async_mode") ||
config::wait_internal_group_commit_finish) {
ctx->group_commit = true;
- } else {
- ctx->group_commit = load_size_smaller_than_wal_limit(req);
- if (!ctx->group_commit) {
- LOG(WARNING) << "The data size for this http load("
- << req->header(HttpHeaders::CONTENT_LENGTH)
- << " Bytes) exceeds the WAL (Write-Ahead Log)
limit ("
- << config::wal_max_disk_size * 0.8
- << " Bytes). Please set this load to \"group
commit\"=false.";
+ group_commit_mode = load_size_smaller_than_wal_limit(req) ?
"async_mode" : "sync_mode";
+ if (iequal(group_commit_mode, "sync_mode")) {
+ size_t max_available_size =
+
ExecEnv::GetInstance()->wal_mgr()->get_max_available_size();
+ LOG(INFO) << "When enable group commit, the data size can't be
too large. The data "
+ "size "
+ "for this http load("
+ << (req->header(HttpHeaders::CONTENT_LENGTH).empty()
+ ? 0
+ :
std::stol(req->header(HttpHeaders::CONTENT_LENGTH)))
+ << " Bytes) exceeds the WAL (Write-Ahead Log) limit
("
+ << max_available_size
+ << " Bytes). So we set this load to \"group
commit\"=sync_mode "
+ "automatically.";
st = Status::Error<EXCEEDED_LIMIT>("Http load size too
large.");
}
}
@@ -358,6 +366,23 @@ Status HttpStreamAction::_process_put(HttpRequest*
http_req,
ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
ctx->label = ctx->put_result.params.import_label;
ctx->put_result.params.__set_wal_id(ctx->wal_id);
+ if (http_req->header(HTTP_GROUP_COMMIT) == "async mode") {
+ if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
+ size_t content_length = 0;
+ content_length =
std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
+ if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
+ ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
+ ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
+ ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
+ ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
+ ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
+ ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
+ content_length *= 3;
+ }
+
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(
+ ctx->id.to_thrift(), content_length));
+ }
+ }
return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
}
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index 04c29a53027..eddf31856a0 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -32,6 +32,7 @@
#include <thrift/protocol/TDebugProtocol.h>
#include <time.h>
+#include <algorithm>
#include <future>
#include <map>
#include <sstream>
@@ -189,15 +190,16 @@ int StreamLoadAction::on_header(HttpRequest* req) {
ctx->label = req->header(HTTP_LABEL_KEY);
Status st = Status::OK();
std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
- if (iequal(group_commit_mode, "off_mode")) {
- group_commit_mode = "";
- }
if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode")
&&
!iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode,
"off_mode")) {
st = Status::InternalError("group_commit can only be [async_mode,
sync_mode, off_mode]");
- if (iequal(group_commit_mode, "off_mode")) {
- group_commit_mode = "";
- }
+ } else if (group_commit_mode.empty() || iequal(group_commit_mode,
"off_mode")) {
+ // off_mode and empty
+ group_commit_mode = "off_mode";
+ ctx->group_commit = false;
+ } else {
+ // sync_mode and async_mode
+ ctx->group_commit = true;
}
auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() &&
iequal(req->header(HTTP_PARTIAL_COLUMNS), "true");
@@ -206,19 +208,26 @@ int StreamLoadAction::on_header(HttpRequest* req) {
auto partitions = !req->header(HTTP_PARTITIONS).empty();
if (!partial_columns && !partitions && !temp_partitions &&
!ctx->two_phase_commit &&
(!group_commit_mode.empty() ||
config::wait_internal_group_commit_finish)) {
- if (!group_commit_mode.empty() && !ctx->label.empty()) {
+ if (!config::wait_internal_group_commit_finish && ctx->group_commit &&
+ !ctx->label.empty()) {
st = Status::InternalError("label and group_commit can't be set at
the same time");
}
- if (config::wait_internal_group_commit_finish) {
+ if (iequal(group_commit_mode, "async_mode") ||
config::wait_internal_group_commit_finish) {
ctx->group_commit = true;
- } else {
- ctx->group_commit = load_size_smaller_than_wal_limit(req);
- if (!ctx->group_commit) {
- LOG(WARNING) << "The data size for this stream load("
- << req->header(HttpHeaders::CONTENT_LENGTH)
- << " Bytes) exceeds the WAL (Write-Ahead Log)
limit ("
- << config::wal_max_disk_size * 0.8
- << " Bytes). Please set this load to \"group
commit\"=false.";
+ group_commit_mode = load_size_smaller_than_wal_limit(req) ?
"async_mode" : "sync_mode";
+ if (iequal(group_commit_mode, "sync_mode")) {
+ size_t max_available_size =
+
ExecEnv::GetInstance()->wal_mgr()->get_max_available_size();
+ LOG(INFO) << "When enable group commit, the data size can't be
too large. The data "
+ "size "
+ "for this stream load("
+ << (req->header(HttpHeaders::CONTENT_LENGTH).empty()
+ ? 0
+ :
std::stol(req->header(HttpHeaders::CONTENT_LENGTH)))
+ << " Bytes) exceeds the WAL (Write-Ahead Log) limit
("
+ << max_available_size
+ << " Bytes). So we set this load to \"group
commit\"=sync_mode "
+ "automatically.";
st = Status::Error<EXCEEDED_LIMIT>("Stream load size too
large.");
}
}
@@ -640,11 +649,7 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req,
request.__set_memtable_on_sink_node(value);
}
if (ctx->group_commit) {
- if (!http_req->header(HTTP_GROUP_COMMIT).empty()) {
-
request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
- } else {
- request.__set_group_commit_mode("sync_mode");
- }
+ request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
}
#ifndef BE_TEST
@@ -665,6 +670,23 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req,
LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status
<< ctx->brief();
return plan_status;
}
+ if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
+ if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
+ size_t content_length = 0;
+ content_length =
std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
+ if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
+ ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
+ ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
+ ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
+ ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
+ ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
+ ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
+ content_length *= 3;
+ }
+
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(
+ ctx->id.to_thrift(), content_length));
+ }
+ }
VLOG_NOTICE << "params is " <<
apache::thrift::ThriftDebugString(ctx->put_result.params);
// if we not use streaming, we must download total content before we begin
diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp
index 0d66887663f..9fbf4c79300 100644
--- a/be/src/http/utils.cpp
+++ b/be/src/http/utils.cpp
@@ -22,6 +22,7 @@
#include <sys/stat.h>
#include <unistd.h>
+#include <algorithm>
#include <memory>
#include <ostream>
#include <vector>
@@ -38,6 +39,8 @@
#include "http/http_status.h"
#include "io/fs/file_system.h"
#include "io/fs/local_file_system.h"
+#include "olap/wal_manager.h"
+#include "runtime/exec_env.h"
#include "util/path_util.h"
#include "util/url_coding.h"
@@ -199,8 +202,8 @@ bool load_size_smaller_than_wal_limit(HttpRequest* req) {
// these blocks within the limited space. So we need to set group_commit =
false to avoid dead lock.
if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
size_t body_bytes =
std::stol(req->header(HttpHeaders::CONTENT_LENGTH));
- // TODO(Yukang): change it to WalManager::wal_limit
- return (body_bytes <= config::wal_max_disk_size * 0.8) && (body_bytes
!= 0);
+ size_t max_available_size =
ExecEnv::GetInstance()->wal_mgr()->get_max_available_size();
+ return (body_bytes != 0 && body_bytes < 0.8 * max_available_size);
} else {
return false;
}
diff --git a/be/src/io/fs/local_file_system.cpp
b/be/src/io/fs/local_file_system.cpp
index 83e1554fe6b..a71708ac72c 100644
--- a/be/src/io/fs/local_file_system.cpp
+++ b/be/src/io/fs/local_file_system.cpp
@@ -172,6 +172,19 @@ Status LocalFileSystem::file_size_impl(const Path& file,
int64_t* file_size) con
return Status::OK();
}
+Status LocalFileSystem::directory_size(const Path& dir_path, size_t* dir_size)
{
+ *dir_size = 0;
+ if (std::filesystem::exists(dir_path) &&
std::filesystem::is_directory(dir_path)) {
+ for (const auto& entry :
std::filesystem::recursive_directory_iterator(dir_path)) {
+ if (std::filesystem::is_regular_file(entry)) {
+ *dir_size += std::filesystem::file_size(entry);
+ }
+ }
+ return Status::OK();
+ }
+ return Status::IOError("faile to get dir size {}", dir_path.native());
+}
+
Status LocalFileSystem::list_impl(const Path& dir, bool only_file,
std::vector<FileInfo>* files,
bool* exists) {
RETURN_IF_ERROR(exists_impl(dir, exists));
diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h
index 53df0b8e06b..4f7e5107fb3 100644
--- a/be/src/io/fs/local_file_system.h
+++ b/be/src/io/fs/local_file_system.h
@@ -78,6 +78,7 @@ public:
// "safe" means the path will be concat with the path prefix
config::user_files_secure_path,
// so that it can not list any files outside the
config::user_files_secure_path
Status safe_glob(const std::string& path, std::vector<FileInfo>* res);
+ Status directory_size(const Path& dir_path, size_t* dir_size);
protected:
Status create_file_impl(const Path& file, FileWriterPtr* writer,
diff --git a/be/src/olap/wal_dirs_info.cpp b/be/src/olap/wal_dirs_info.cpp
new file mode 100644
index 00000000000..340d896a8c6
--- /dev/null
+++ b/be/src/olap/wal_dirs_info.cpp
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/wal_dirs_info.h"
+
+#include <memory>
+#include <mutex>
+#include <shared_mutex>
+
+#include "common/config.h"
+#include "common/status.h"
+#include "io/fs/local_file_system.h"
+#include "util/parse_util.h"
+
+namespace doris {
+
+std::string WalDirInfo::get_wal_dir() {
+ return _wal_dir;
+}
+
+size_t WalDirInfo::get_limit() {
+ std::shared_lock rlock(_lock);
+ return _limit;
+}
+
+size_t WalDirInfo::get_used() {
+ std::shared_lock rlock(_lock);
+ return _used;
+}
+
+size_t WalDirInfo::get_pre_allocated() {
+ std::shared_lock rlock(_lock);
+ return _pre_allocated;
+}
+
+Status WalDirInfo::set_limit(size_t limit) {
+ std::unique_lock wlock(_lock);
+ _limit = limit;
+ return Status::OK();
+}
+
+Status WalDirInfo::set_used(size_t used) {
+ std::unique_lock wlock(_lock);
+ _used = used;
+ return Status::OK();
+}
+
+Status WalDirInfo::set_pre_allocated(size_t pre_allocated, bool
is_add_pre_allocated) {
+ std::unique_lock wlock(_lock);
+ if (is_add_pre_allocated) {
+ _pre_allocated += pre_allocated;
+ } else {
+ _pre_allocated -= pre_allocated;
+ }
+ return Status::OK();
+}
+
+size_t WalDirInfo::available() {
+ std::unique_lock wlock(_lock);
+ int64_t available = _limit - _used - _pre_allocated;
+ return available > 0 ? available : 0;
+}
+
+Status WalDirInfo::update_wal_dir_limit(size_t limit) {
+ if (limit != static_cast<size_t>(-1)) {
+ RETURN_IF_ERROR(set_limit(limit));
+ } else {
+ size_t available_bytes;
+ size_t disk_capacity_bytes;
+ RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(
+ _wal_dir, &disk_capacity_bytes, &available_bytes));
+ bool is_percent = true;
+ int64_t wal_disk_limit =
ParseUtil::parse_mem_spec(config::group_commit_wal_max_disk_limit,
+ -1,
available_bytes, &is_percent);
+ if (wal_disk_limit <= 0) {
+ return Status::InternalError("Disk full! Please check your disk
usage!");
+ }
+ size_t wal_dir_size = 0;
+
RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir,
&wal_dir_size));
+ RETURN_IF_ERROR(set_limit(wal_disk_limit));
+ }
+ return Status::OK();
+}
+
+Status WalDirInfo::update_wal_dir_used(size_t used) {
+ if (used != static_cast<size_t>(-1)) {
+ RETURN_IF_ERROR(set_used(used));
+ } else {
+ size_t wal_dir_size = 0;
+
RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir,
&wal_dir_size));
+ RETURN_IF_ERROR(set_used(wal_dir_size));
+ }
+ return Status::OK();
+}
+
+Status WalDirInfo::update_wal_dir_pre_allocated(size_t pre_allocated, bool
is_add_pre_allocated) {
+ RETURN_IF_ERROR(set_pre_allocated(pre_allocated, is_add_pre_allocated));
+ return Status::OK();
+}
+
+Status WalDirsInfo::add(const std::string& wal_dir, size_t limit, size_t used,
+ size_t pre_allocated) {
+ for (const auto& it : _wal_dirs_info_vec) {
+ if (it->get_wal_dir() == wal_dir) {
+#ifdef BE_TEST
+ return Status::OK();
+#endif
+ return Status::InternalError("wal dir {} exists!", wal_dir);
+ }
+ }
+ std::unique_lock wlock(_lock);
+ _wal_dirs_info_vec.emplace_back(
+ std::make_shared<WalDirInfo>(wal_dir, limit, used, pre_allocated));
+ return Status::OK();
+}
+
+Status WalDirsInfo::clear() {
+ std::unique_lock wlock(_lock);
+ _wal_dirs_info_vec.clear();
+ return Status::OK();
+}
+
+std::string WalDirsInfo::get_available_random_wal_dir() {
+ if (_wal_dirs_info_vec.size() == 1) {
+ return (*_wal_dirs_info_vec.begin())->get_wal_dir();
+ } else {
+ std::vector<std::string> available_wal_dirs;
+ for (const auto& wal_dir_info : _wal_dirs_info_vec) {
+ if (wal_dir_info->available() > wal_dir_info->get_limit() * 0.2) {
+ available_wal_dirs.emplace_back(wal_dir_info->get_wal_dir());
+ }
+ }
+ if (available_wal_dirs.empty()) {
+ return (*std::min_element(_wal_dirs_info_vec.begin(),
_wal_dirs_info_vec.end(),
+ [](const auto& info1, const auto& info2)
{
+ return info1->available() <
info2->available();
+ }))
+ ->get_wal_dir();
+ } else {
+ return (*std::next(_wal_dirs_info_vec.begin(), rand() %
_wal_dirs_info_vec.size()))
+ ->get_wal_dir();
+ }
+ }
+}
+
+size_t WalDirsInfo::get_max_available_size() {
+ return _wal_dirs_info_vec.size() == 1
+ ? (*_wal_dirs_info_vec.begin())->available()
+ : (*std::max_element(_wal_dirs_info_vec.begin(),
_wal_dirs_info_vec.end(),
+ [](const auto& info1, const auto&
info2) {
+ return info1->available() <
info2->available();
+ }))
+ ->available();
+}
+
+Status WalDirsInfo::update_wal_dir_limit(std::string wal_dir, size_t limit) {
+ for (const auto& wal_dir_info : _wal_dirs_info_vec) {
+ if (wal_dir_info->get_wal_dir() == wal_dir) {
+ return wal_dir_info->update_wal_dir_limit(limit);
+ }
+ }
+ return Status::InternalError("Can not find wal dir in wal disks info.");
+}
+
+Status WalDirsInfo::update_all_wal_dir_limit() {
+ for (const auto& wal_dir_info : _wal_dirs_info_vec) {
+ RETURN_IF_ERROR(wal_dir_info->update_wal_dir_limit(-1));
+ }
+ return Status::OK();
+}
+
+Status WalDirsInfo::update_wal_dir_used(std::string wal_dir, size_t used) {
+ for (const auto& wal_dir_info : _wal_dirs_info_vec) {
+ if (wal_dir_info->get_wal_dir() == wal_dir) {
+ return wal_dir_info->update_wal_dir_used(used);
+ }
+ }
+ return Status::InternalError("Can not find wal dir in wal disks info.");
+}
+
+Status WalDirsInfo::update_all_wal_dir_used() {
+ for (const auto& wal_dir_info : _wal_dirs_info_vec) {
+ RETURN_IF_ERROR(wal_dir_info->update_wal_dir_used(-1));
+ }
+ return Status::OK();
+}
+
+Status WalDirsInfo::update_wal_dir_pre_allocated(std::string wal_dir, size_t
pre_allocated,
+ bool is_add_pre_allocated) {
+ for (const auto& wal_dir_info : _wal_dirs_info_vec) {
+ if (wal_dir_info->get_wal_dir() == wal_dir) {
+ return wal_dir_info->update_wal_dir_pre_allocated(pre_allocated,
is_add_pre_allocated);
+ }
+ }
+ return Status::InternalError("Can not find wal dir in wal disks info.");
+}
+
+Status WalDirsInfo::get_wal_dir_available_size(const std::string& wal_dir,
+ size_t* available_bytes) {
+ std::shared_lock l(_lock);
+ for (const auto& wal_dir_info : _wal_dirs_info_vec) {
+ if (wal_dir_info->get_wal_dir() == wal_dir) {
+ *available_bytes = wal_dir_info->available();
+ return Status::OK();
+ }
+ }
+ return Status::InternalError("can not find wal dir!");
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_dirs_info.h b/be/src/olap/wal_dirs_info.h
new file mode 100644
index 00000000000..048cd8f9564
--- /dev/null
+++ b/be/src/olap/wal_dirs_info.h
@@ -0,0 +1,85 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <mutex>
+#include <shared_mutex>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "common/factory_creator.h"
+#include "common/status.h"
+
+namespace doris {
+class WalDirInfo {
+ ENABLE_FACTORY_CREATOR(WalDirInfo);
+
+public:
+ WalDirInfo(std::string wal_dir, size_t limit, size_t used, size_t
pre_allocated)
+ : _wal_dir(std::move(wal_dir)),
+ _limit(limit),
+ _used(used),
+ _pre_allocated(pre_allocated) {}
+ std::string get_wal_dir();
+ size_t get_limit();
+ size_t get_used();
+ size_t get_pre_allocated();
+ Status set_limit(size_t limit);
+ Status set_used(size_t used);
+ Status set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated);
+ size_t available();
+ Status update_wal_dir_limit(size_t limit = -1);
+ Status update_wal_dir_used(size_t used = -1);
+ Status update_wal_dir_pre_allocated(size_t pre_allocated, bool
is_add_pre_allocated = true);
+
+private:
+ std::string _wal_dir;
+ size_t _limit;
+ size_t _used;
+ size_t _pre_allocated;
+ std::shared_mutex _lock;
+};
+
+class WalDirsInfo {
+ ENABLE_FACTORY_CREATOR(WalDirsInfo);
+
+public:
+ WalDirsInfo() = default;
+ ~WalDirsInfo() = default;
+ Status add(const std::string& wal_dir, size_t limit, size_t used, size_t
pre_allocated);
+ Status clear();
+ std::string get_available_random_wal_dir();
+ size_t get_max_available_size();
+ Status update_wal_dir_limit(std::string wal_dir, size_t limit = -1);
+ Status update_all_wal_dir_limit();
+ Status update_wal_dir_used(std::string wal_dir, size_t used = -1);
+ Status update_all_wal_dir_used();
+ Status update_wal_dir_pre_allocated(std::string wal_dir, size_t
pre_allocated,
+ bool is_add_pre_allocated);
+ Status get_wal_dir_available_size(const std::string& wal_dir, size_t*
available_bytes);
+
+private:
+ std::vector<std::shared_ptr<WalDirInfo>> _wal_dirs_info_vec;
+ std::shared_mutex _lock;
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp
index 238f88ebd00..c62996da19b 100644
--- a/be/src/olap/wal_manager.cpp
+++ b/be/src/olap/wal_manager.cpp
@@ -17,6 +17,7 @@
#include "olap/wal_manager.h"
+#include <glog/logging.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <atomic>
@@ -24,14 +25,24 @@
#include <cstdint>
#include <filesystem>
#include <memory>
+#include <mutex>
+#include <shared_mutex>
+#include <thread>
+#include <unordered_map>
#include <utility>
+#include <vector>
+#include "common/config.h"
+#include "common/status.h"
#include "io/fs/local_file_system.h"
+#include "olap/wal_dirs_info.h"
#include "olap/wal_writer.h"
#include "runtime/client_cache.h"
+#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/plan_fragment_executor.h"
#include "runtime/stream_load/stream_load_context.h"
+#include "util/parse_util.h"
#include "util/path_util.h"
#include "util/thrift_rpc_helper.h"
#include "vec/exec/format/wal/wal_reader.h"
@@ -40,12 +51,11 @@ namespace doris {
WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list)
: _exec_env(exec_env), _stop_background_threads_latch(1), _stop(false)
{
doris::vectorized::WalReader::string_split(wal_dir_list, ";", _wal_dirs);
- _all_wal_disk_bytes = std::make_shared<std::atomic_size_t>(0);
- _cv = std::make_shared<std::condition_variable>();
static_cast<void>(ThreadPoolBuilder("GroupCommitReplayWalThreadPool")
.set_min_threads(1)
.set_max_threads(config::group_commit_relay_wal_threads)
.build(&_thread_pool));
+ _wal_dirs_info = WalDirsInfo::create_unique();
}
WalManager::~WalManager() {
@@ -60,12 +70,46 @@ void WalManager::stop() {
if (_replay_thread) {
_replay_thread->join();
}
+ if (_update_wal_dirs_info_thread) {
+ _update_wal_dirs_info_thread->join();
+ }
_thread_pool->shutdown();
LOG(INFO) << "WalManager is stopped";
}
}
Status WalManager::init() {
+ RETURN_IF_ERROR(_init_wal_dirs_conf());
+ RETURN_IF_ERROR(_init_wal_dirs());
+ RETURN_IF_ERROR(_init_wal_dirs_info());
+ return Thread::create(
+ "WalMgr", "replay_wal", [this]() {
static_cast<void>(this->replay()); },
+ &_replay_thread);
+}
+
+Status WalManager::_init_wal_dirs_conf() {
+ std::vector<std::string> tmp_dirs;
+ if (_wal_dirs.empty()) {
+ // default case.
+ for (const StorePath& path : ExecEnv::GetInstance()->store_paths()) {
+ tmp_dirs.emplace_back(path.path + "/wal");
+ }
+ } else {
+ // user config must be absolute path.
+ for (const std::string& wal_dir : _wal_dirs) {
+ if (std::filesystem::path(wal_dir).is_absolute()) {
+ tmp_dirs.emplace_back(wal_dir);
+ } else {
+ return Status::InternalError(
+ "BE config group_commit_replay_wal_dir has to be
absolute path!");
+ }
+ }
+ }
+ _wal_dirs = tmp_dirs;
+ return Status::OK();
+}
+
+Status WalManager::_init_wal_dirs() {
bool exists = false;
for (auto wal_dir : _wal_dirs) {
std::string tmp_dir = wal_dir + "/tmp";
@@ -80,9 +124,44 @@ Status WalManager::init() {
}
RETURN_IF_ERROR(scan_wals(wal_dir));
}
+ return Status::OK();
+}
+
+Status WalManager::_init_wal_dirs_info() {
+ for (const std::string& wal_dir : _wal_dirs) {
+ size_t available_bytes;
+#ifndef BE_TEST
+ size_t disk_capacity_bytes;
+ RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(wal_dir,
&disk_capacity_bytes,
+
&available_bytes));
+#else
+ available_bytes = wal_limit_test_bytes;
+#endif
+ bool is_percent = true;
+ int64_t wal_disk_limit =
ParseUtil::parse_mem_spec(config::group_commit_wal_max_disk_limit,
+ -1,
available_bytes, &is_percent);
+ if (wal_disk_limit < 0) {
+ return Status::InternalError(
+ "group_commit_wal_max_disk_limit config is wrong, please
check your config!");
+ }
+ // if there are some wal files in wal dir, we need to add it to wal
disk limit.
+ size_t wal_dir_size = 0;
+#ifndef BE_TEST
+ RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(wal_dir,
&wal_dir_size));
+#endif
+ if (is_percent) {
+ wal_disk_limit += wal_dir_size;
+ }
+ RETURN_IF_ERROR(_wal_dirs_info->add(wal_dir, wal_disk_limit,
wal_dir_size, 0));
+
+#ifdef BE_TEST
+ wal_limit_test_bytes = wal_disk_limit;
+#endif
+ }
return Thread::create(
- "WalMgr", "replay_wal", [this]() {
static_cast<void>(this->replay()); },
- &_replay_thread);
+ "WalMgr", "update_wal_dir_info",
+ [this]() { static_cast<void>(this->_update_wal_dir_info_thread());
},
+ &_update_wal_dirs_info_thread);
}
void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id,
WAL_STATUS wal_status) {
@@ -158,9 +237,8 @@ void WalManager::print_wal_status_queue() {
}
Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t
wal_id,
- const std::string& label) {
- std::string base_path =
- _wal_dirs.size() == 1 ? _wal_dirs[0] : _wal_dirs[rand() %
_wal_dirs.size()];
+ const std::string& label, std::string&
base_path) {
+ base_path = _wal_dirs_info->get_available_random_wal_dir();
std::stringstream ss;
ss << base_path << "/" << std::to_string(db_id) << "/" <<
std::to_string(table_id) << "/"
<< std::to_string(wal_id) << "_" << label;
@@ -209,7 +287,7 @@ Status WalManager::create_wal_writer(int64_t wal_id,
std::shared_ptr<WalWriter>&
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(base_path));
}
LOG(INFO) << "create wal " << wal_path;
- wal_writer = std::make_shared<WalWriter>(wal_path, _all_wal_disk_bytes,
_cv);
+ wal_writer = std::make_shared<WalWriter>(wal_path);
RETURN_IF_ERROR(wal_writer->init());
{
std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
@@ -250,7 +328,7 @@ Status WalManager::scan_wals(const std::string& wal_path) {
<< ", st=" << st.to_string();
return st;
}
- if (wals.size() == 0) {
+ if (wals.empty()) {
continue;
}
std::vector<std::string> res;
@@ -331,6 +409,12 @@ Status WalManager::add_recover_wal(int64_t db_id, int64_t
table_id, std::vector<
table_ptr = it->second;
}
table_ptr->add_wals(wals);
+#ifndef BE_TEST
+ for (auto wal : wals) {
+ RETURN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal)));
+ RETURN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal)));
+ }
+#endif
return Status::OK();
}
@@ -344,31 +428,20 @@ size_t WalManager::get_wal_table_size(int64_t table_id) {
}
}
-Status WalManager::delete_wal(int64_t wal_id) {
+Status WalManager::delete_wal(int64_t wal_id, size_t
block_queue_pre_allocated) {
+ std::string wal_path;
{
std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
- if (_wal_id_to_writer_map.find(wal_id) != _wal_id_to_writer_map.end())
{
-
_all_wal_disk_bytes->fetch_sub(_wal_id_to_writer_map[wal_id]->disk_bytes(),
- std::memory_order_relaxed);
- _cv->notify_one();
- std::string wal_path = _wal_path_map[wal_id];
- LOG(INFO) << "wal delete file=" << wal_path << ", this file disk
usage is "
- << _wal_id_to_writer_map[wal_id]->disk_bytes()
- << " ,after deleting it, all wals disk usage is "
- << _all_wal_disk_bytes->load(std::memory_order_relaxed);
- _wal_id_to_writer_map.erase(wal_id);
- }
- if (_wal_id_to_writer_map.empty()) {
- CHECK_EQ(_all_wal_disk_bytes->load(std::memory_order_relaxed), 0);
- }
auto it = _wal_path_map.find(wal_id);
if (it != _wal_path_map.end()) {
- std::string wal_path = it->second;
+ wal_path = it->second;
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path));
LOG(INFO) << "delete file=" << wal_path;
_wal_path_map.erase(wal_id);
}
}
+ RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path),
+ block_queue_pre_allocated,
false));
return Status::OK();
}
@@ -409,4 +482,46 @@ Status WalManager::get_wal_column_index(int64_t wal_id,
std::vector<size_t>& col
return Status::OK();
}
+size_t WalManager::get_max_available_size() {
+ return _wal_dirs_info->get_max_available_size();
+}
+
+Status WalManager::update_wal_dir_limit(const std::string& wal_dir, size_t
limit) {
+ return _wal_dirs_info->update_wal_dir_limit(wal_dir, limit);
+}
+
+Status WalManager::update_wal_dir_used(const std::string& wal_dir, size_t
used) {
+ return _wal_dirs_info->update_wal_dir_used(wal_dir, used);
+}
+
+Status WalManager::update_wal_dir_pre_allocated(const std::string& wal_dir,
size_t pre_allocated,
+ bool is_add_pre_allocated) {
+ return _wal_dirs_info->update_wal_dir_pre_allocated(wal_dir, pre_allocated,
+ is_add_pre_allocated);
+}
+
+Status WalManager::_update_wal_dir_info_thread() {
+ while (!_stop.load()) {
+ static_cast<void>(_wal_dirs_info->update_all_wal_dir_limit());
+ static_cast<void>(_wal_dirs_info->update_all_wal_dir_used());
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ return Status::OK();
+}
+
+Status WalManager::get_wal_dir_available_size(const std::string& wal_dir,
size_t* available_bytes) {
+ return _wal_dirs_info->get_wal_dir_available_size(wal_dir,
available_bytes);
+}
+
+std::string WalManager::_get_base_wal_path(const std::string& wal_path_str) {
+ io::Path wal_path = wal_path_str;
+ for (int i = 0; i < 3; ++i) {
+ if (!wal_path.has_parent_path()) {
+ return "";
+ }
+ wal_path = wal_path.parent_path();
+ }
+ return wal_path.string();
+}
+
} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h
index 83847beabf0..8d330a64bb9 100644
--- a/be/src/olap/wal_manager.h
+++ b/be/src/olap/wal_manager.h
@@ -16,15 +16,24 @@
// under the License.
#pragma once
+
#include <gen_cpp/PaloInternalService_types.h>
+#include <atomic>
#include <condition_variable>
+#include <cstddef>
+#include <cstdint>
#include <memory>
+#include <shared_mutex>
+#include <thread>
+#include <unordered_map>
#include "common/config.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/FrontendService_types.h"
#include "gen_cpp/HeartbeatService_types.h"
+#include "gutil/ref_counted.h"
+#include "olap/wal_dirs_info.h"
#include "olap/wal_reader.h"
#include "olap/wal_table.h"
#include "olap/wal_writer.h"
@@ -47,7 +56,7 @@ public:
public:
WalManager(ExecEnv* exec_env, const std::string& wal_dir);
~WalManager();
- Status delete_wal(int64_t wal_id);
+ Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated = 0);
Status init();
Status scan_wals(const std::string& wal_path);
Status replay();
@@ -56,7 +65,8 @@ public:
Status scan();
size_t get_wal_table_size(int64_t table_id);
Status add_recover_wal(int64_t db_id, int64_t table_id,
std::vector<std::string> wals);
- Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const
std::string& label);
+ Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const
std::string& label,
+ std::string& base_path);
Status get_wal_path(int64_t wal_id, std::string& wal_path);
Status get_wal_status_queue_size(const PGetWalQueueSizeRequest* request,
PGetWalQueueSizeResponse* response);
@@ -70,10 +80,30 @@ public:
void erase_wal_column_index(int64_t wal_id);
Status get_wal_column_index(int64_t wal_id, std::vector<size_t>&
column_index);
+ Status update_wal_dir_limit(const std::string& wal_dir, size_t limit = -1);
+ Status update_wal_dir_used(const std::string& wal_dir, size_t used = -1);
+ Status update_wal_dir_pre_allocated(const std::string& wal_dir, size_t
pre_allocated,
+ bool is_add_pre_allocated);
+ Status get_wal_dir_available_size(const std::string& wal_dir, size_t*
available_bytes);
+ size_t get_max_available_size();
+
+private:
+ Status _init_wal_dirs_conf();
+ Status _init_wal_dirs();
+ Status _init_wal_dirs_info();
+ std::string _get_base_wal_path(const std::string& wal_path_str);
+ const std::string& _get_available_random_wal_dir();
+ Status _update_wal_dir_info_thread();
+
+public:
+ // used for be ut
+ size_t wal_limit_test_bytes;
+
private:
ExecEnv* _exec_env = nullptr;
std::shared_mutex _lock;
scoped_refptr<Thread> _replay_thread;
+ scoped_refptr<Thread> _update_wal_dirs_info_thread;
CountDownLatch _stop_background_threads_latch;
std::map<int64_t, std::shared_ptr<WalTable>> _table_map;
std::vector<std::string> _wal_dirs;
@@ -81,12 +111,11 @@ private:
std::shared_mutex _wal_status_lock;
std::unordered_map<int64_t, std::string> _wal_path_map;
std::unordered_map<int64_t, std::shared_ptr<WalWriter>>
_wal_id_to_writer_map;
- std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes;
std::unordered_map<int64_t, std::unordered_map<int64_t, WAL_STATUS>>
_wal_status_queues;
std::atomic<bool> _stop;
std::shared_mutex _wal_column_id_map_lock;
std::unordered_map<int64_t, std::vector<size_t>&> _wal_column_id_map;
- std::shared_ptr<std::condition_variable> _cv;
std::unique_ptr<doris::ThreadPool> _thread_pool;
+ std::unique_ptr<WalDirsInfo> _wal_dirs_info;
};
} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_writer.cpp b/be/src/olap/wal_writer.cpp
index 9d3da90d887..beae6ec80b6 100644
--- a/be/src/olap/wal_writer.cpp
+++ b/be/src/olap/wal_writer.cpp
@@ -26,6 +26,7 @@
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
#include "olap/storage_engine.h"
+#include "olap/wal_manager.h"
#include "util/crc32c.h"
namespace doris {
@@ -33,14 +34,7 @@ namespace doris {
const char* k_wal_magic = "WAL1";
const uint32_t k_wal_magic_length = 4;
-WalWriter::WalWriter(const std::string& file_name,
- const std::shared_ptr<std::atomic_size_t>&
all_wal_disk_bytes,
- const std::shared_ptr<std::condition_variable>& cv)
- : cv(cv),
- _file_name(file_name),
- _disk_bytes(0),
- _all_wal_disk_bytes(all_wal_disk_bytes),
- _is_first_append_blocks(true) {}
+WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name) {}
WalWriter::~WalWriter() {}
@@ -58,22 +52,6 @@ Status WalWriter::finalize() {
}
Status WalWriter::append_blocks(const PBlockArray& blocks) {
- {
- if (_is_first_append_blocks) {
- _is_first_append_blocks = false;
- std::unique_lock l(_mutex);
- while (_all_wal_disk_bytes->load(std::memory_order_relaxed) >
- config::wal_max_disk_size) {
- LOG(INFO) << "First time to append blocks to wal file " <<
_file_name
- << ". Currently, all wal disk space usage is "
- <<
_all_wal_disk_bytes->load(std::memory_order_relaxed)
- << ", larger than the maximum limit " <<
config::wal_max_disk_size
- << ", so we need to wait. When any other load
finished, that wal will be "
- "removed, the space used by that wal will be
free.";
- cv->wait_for(l,
std::chrono::milliseconds(WalWriter::MAX_WAL_WRITE_WAIT_TIME));
- }
- }
- }
size_t total_size = 0;
for (const auto& block : blocks) {
total_size += LENGTH_SIZE + block->ByteSizeLong() + CHECKSUM_SIZE;
@@ -99,8 +77,6 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) {
"failed to write block to wal expected= " +
std::to_string(total_size) +
",actually=" + std::to_string(offset));
}
- _disk_bytes.fetch_add(total_size, std::memory_order_relaxed);
- _all_wal_disk_bytes->fetch_add(total_size, std::memory_order_relaxed);
return Status::OK();
}
diff --git a/be/src/olap/wal_writer.h b/be/src/olap/wal_writer.h
index 88ff4659769..ea8bba4f021 100644
--- a/be/src/olap/wal_writer.h
+++ b/be/src/olap/wal_writer.h
@@ -34,16 +34,13 @@ extern const uint32_t k_wal_magic_length;
class WalWriter {
public:
- explicit WalWriter(const std::string& file_name,
- const std::shared_ptr<std::atomic_size_t>&
all_wal_disk_bytes,
- const std::shared_ptr<std::condition_variable>& cv);
+ explicit WalWriter(const std::string& file_name);
~WalWriter();
Status init();
Status finalize();
Status append_blocks(const PBlockArray& blocks);
- size_t disk_bytes() const { return
_disk_bytes.load(std::memory_order_relaxed); };
Status append_header(uint32_t version, std::string col_ids);
std::string file_name() { return _file_name; };
@@ -51,17 +48,12 @@ public:
public:
static const int64_t LENGTH_SIZE = 8;
static const int64_t CHECKSUM_SIZE = 4;
- std::shared_ptr<std::condition_variable> cv;
static const int64_t VERSION_SIZE = 4;
private:
static constexpr size_t MAX_WAL_WRITE_WAIT_TIME = 1000;
std::string _file_name;
io::FileWriterPtr _file_writer;
- std::atomic_size_t _disk_bytes;
- std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes;
- std::mutex _mutex;
- bool _is_first_append_blocks;
};
} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 4f4325a79c7..4c1060891f5 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -229,6 +229,7 @@ public:
void set_routine_load_task_executor(RoutineLoadTaskExecutor* r) {
this->_routine_load_task_executor = r;
}
+ void set_wal_mgr(std::shared_ptr<WalManager> wm) { this->_wal_manager =
wm; }
#endif
stream_load::LoadStreamStubPool* load_stream_stub_pool() {
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 201a290a6e8..28df650f13c 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -21,14 +21,22 @@
#include <glog/logging.h>
#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <shared_mutex>
+#include <vector>
#include "client_cache.h"
#include "common/config.h"
+#include "common/status.h"
#include "olap/wal_manager.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/runtime_state.h"
#include "util/thrift_rpc_helper.h"
+#include "vec/core/block.h"
namespace doris {
@@ -46,7 +54,6 @@ Status
LoadBlockQueue::add_block(std::shared_ptr<vectorized::Block> block, bool
RETURN_IF_ERROR(_v_wal_writer->write_wal(block.get()));
}
_all_block_queues_bytes->fetch_add(block->bytes(),
std::memory_order_relaxed);
- _single_block_queue_bytes->fetch_add(block->bytes(),
std::memory_order_relaxed);
}
_get_cond.notify_all();
return Status::OK();
@@ -68,7 +75,6 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state,
vectorized::Block*
}
while (!runtime_state->is_cancelled() && status.ok() &&
_block_queue.empty() &&
(!need_commit || (need_commit && !_load_ids.empty()))) {
- CHECK_EQ(_single_block_queue_bytes->load(), 0);
auto left_milliseconds = _group_commit_interval_ms;
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
@@ -106,10 +112,8 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
*find_block = true;
_block_queue.pop_front();
_all_block_queues_bytes->fetch_sub(block->bytes(),
std::memory_order_relaxed);
- _single_block_queue_bytes->fetch_sub(block->bytes(),
std::memory_order_relaxed);
}
if (_block_queue.empty() && need_commit && _load_ids.empty()) {
- CHECK_EQ(_single_block_queue_bytes->load(), 0);
*eos = true;
} else {
*eos = false;
@@ -150,7 +154,6 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st)
{
{
auto& future_block = _block_queue.front();
_all_block_queues_bytes->fetch_sub(future_block->bytes(),
std::memory_order_relaxed);
- _single_block_queue_bytes->fetch_sub(future_block->bytes(),
std::memory_order_relaxed);
}
_block_queue.pop_front();
}
@@ -333,11 +336,12 @@ Status
GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
10000L);
result_status = Status::create(result.status);
}
+ std::shared_ptr<LoadBlockQueue> load_block_queue;
{
std::lock_guard<std::mutex> l(_lock);
auto it = _load_block_queues.find(instance_id);
if (it != _load_block_queues.end()) {
- auto& load_block_queue = it->second;
+ load_block_queue = it->second;
if (!status.ok()) {
load_block_queue->cancel(status);
}
@@ -357,7 +361,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t
db_id, int64_t table_
// result_status: commit txn result
if (status.ok() && st.ok() &&
(result_status.ok() ||
result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) {
- RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(txn_id));
+ RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(
+ txn_id, load_block_queue->block_queue_pre_allocated.load()));
RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(table_id,
txn_id));
} else {
std::string wal_path;
@@ -443,8 +448,9 @@ Status GroupCommitMgr::get_first_block_load_queue(int64_t
db_id, int64_t table_i
}
group_commit_table = _table_map[table_id];
}
- return group_commit_table->get_first_block_load_queue(table_id,
base_schema_version, load_id,
- load_block_queue,
be_exe_version);
+ RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
+ table_id, base_schema_version, load_id, load_block_queue,
be_exe_version));
+ return Status::OK();
}
Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId&
instance_id,
@@ -461,11 +467,14 @@ Status GroupCommitMgr::get_load_block_queue(int64_t
table_id, const TUniqueId& i
}
return group_commit_table->get_load_block_queue(instance_id,
load_block_queue);
}
+
Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id,
const std::string& import_label, WalManager*
wal_manager,
std::vector<TSlotDescriptor>& slot_desc, int
be_exe_version) {
+ RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->add_wal_path(db_id,
tb_id, wal_id,
+
import_label, wal_base_path));
_v_wal_writer = std::make_shared<vectorized::VWalWriter>(
- db_id, tb_id, wal_id, import_label, wal_manager, slot_desc,
be_exe_version);
+ tb_id, wal_id, import_label, wal_manager, slot_desc,
be_exe_version);
return _v_wal_writer->init();
}
@@ -475,4 +484,67 @@ Status LoadBlockQueue::close_wal() {
}
return Status::OK();
}
+
+bool LoadBlockQueue::has_enough_wal_disk_space(
+ const std::vector<std::shared_ptr<vectorized::Block>>& blocks, const
TUniqueId& load_id,
+ bool is_blocks_contain_all_load_data) {
+ size_t blocks_size = 0;
+ for (auto block : blocks) {
+ blocks_size += block->bytes();
+ }
+ size_t content_length = 0;
+ Status st =
ExecEnv::GetInstance()->group_commit_mgr()->get_load_info(load_id,
&content_length);
+ if (st.ok()) {
+
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->remove_load_info(load_id));
+ } else {
+ return Status::InternalError("can not find load id.");
+ }
+ size_t pre_allocated = is_blocks_contain_all_load_data
+ ? blocks_size
+ : (blocks_size > content_length ?
blocks_size : content_length);
+ auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
+ size_t available_bytes = 0;
+ {
+ st = wal_mgr->get_wal_dir_available_size(wal_base_path,
&available_bytes);
+ if (!st.ok()) {
+ LOG(WARNING) << "get wal disk available size filed!";
+ }
+ }
+ if (pre_allocated < available_bytes) {
+ st = wal_mgr->update_wal_dir_pre_allocated(wal_base_path,
pre_allocated, true);
+ if (!st.ok()) {
+ LOG(WARNING) << "update wal dir pre_allocated failed, reason: " <<
st.to_string();
+ }
+ block_queue_pre_allocated.fetch_add(pre_allocated);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+Status GroupCommitMgr::update_load_info(TUniqueId load_id, size_t
content_length) {
+ std::unique_lock l(_load_info_lock);
+ if (_load_id_to_content_length_map.find(load_id) ==
_load_id_to_content_length_map.end()) {
+ _load_id_to_content_length_map.insert(std::make_pair(load_id,
content_length));
+ }
+ return Status::OK();
+}
+
+Status GroupCommitMgr::get_load_info(TUniqueId load_id, size_t*
content_length) {
+ std::shared_lock l(_load_info_lock);
+ if (_load_id_to_content_length_map.find(load_id) !=
_load_id_to_content_length_map.end()) {
+ *content_length = _load_id_to_content_length_map[load_id];
+ return Status::OK();
+ }
+ return Status::InternalError("can not find load id!");
+}
+
+Status GroupCommitMgr::remove_load_info(TUniqueId load_id) {
+ std::unique_lock l(_load_info_lock);
+ if (_load_id_to_content_length_map.find(load_id) ==
_load_id_to_content_length_map.end()) {
+ return Status::InternalError("can not remove load id!");
+ }
+ _load_id_to_content_length_map.erase(load_id);
+ return Status::OK();
+}
} // namespace doris
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index 6f625eda6ae..35afcc46249 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -20,9 +20,17 @@
#include <gen_cpp/PaloInternalService_types.h>
#include <atomic>
+#include <condition_variable>
+#include <cstdint>
#include <memory>
+#include <mutex>
+#include <shared_mutex>
+#include <unordered_map>
+#include <utility>
#include "common/status.h"
+#include "olap/wal_manager.h"
+#include "runtime/exec_env.h"
#include "util/threadpool.h"
#include "vec/core/block.h"
#include "vec/sink/writer/vwal_writer.h"
@@ -45,9 +53,7 @@ public:
wait_internal_group_commit_finish(wait_internal_group_commit_finish),
_start_time(std::chrono::steady_clock::now()),
_all_block_queues_bytes(all_block_queues_bytes),
- _group_commit_interval_ms(group_commit_interval_ms) {
- _single_block_queue_bytes = std::make_shared<std::atomic_size_t>(0);
- };
+ _group_commit_interval_ms(group_commit_interval_ms) {};
Status add_block(std::shared_ptr<vectorized::Block> block, bool write_wal);
Status get_block(RuntimeState* runtime_state, vectorized::Block* block,
bool* find_block,
@@ -59,6 +65,8 @@ public:
WalManager* wal_manager, std::vector<TSlotDescriptor>&
slot_desc,
int be_exe_version);
Status close_wal();
+ bool has_enough_wal_disk_space(const
std::vector<std::shared_ptr<vectorized::Block>>& blocks,
+ const TUniqueId& load_id, bool
is_blocks_contain_all_load_data);
static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000;
UniqueId load_instance_id;
@@ -71,6 +79,8 @@ public:
bool process_finish = false;
std::condition_variable internal_group_commit_finish_cv;
Status status = Status::OK();
+ std::string wal_base_path;
+ std::atomic_size_t block_queue_pre_allocated = 0;
private:
void _cancel_without_lock(const Status& st);
@@ -84,8 +94,6 @@ private:
// memory consumption of all tables' load block queues, used for back
pressure.
std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
- // memory consumption of one load block queue, used for correctness check.
- std::shared_ptr<std::atomic_size_t> _single_block_queue_bytes;
// group commit interval in ms, can be changed by 'ALTER TABLE my_table
SET ("group_commit_interval_ms"="1000");'
int64_t _group_commit_interval_ms;
std::shared_ptr<vectorized::VWalWriter> _v_wal_writer;
@@ -145,6 +153,10 @@ public:
const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>&
load_block_queue,
int be_exe_version);
+ Status update_load_info(TUniqueId load_id, size_t content_length);
+ Status get_load_info(TUniqueId load_id, size_t* content_length);
+ Status remove_load_info(TUniqueId load_id);
+ std::condition_variable cv;
private:
ExecEnv* _exec_env = nullptr;
@@ -155,6 +167,8 @@ private:
std::unique_ptr<doris::ThreadPool> _thread_pool;
// memory consumption of all tables' load block queues, used for back
pressure.
std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
+ std::shared_mutex _load_info_lock;
+ std::unordered_map<TUniqueId, size_t> _load_id_to_content_length_map;
};
} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp
b/be/src/vec/sink/group_commit_block_sink.cpp
index d4ede45868f..3e38f9c42fc 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -17,6 +17,14 @@
#include "vec/sink/group_commit_block_sink.h"
+#include <gen_cpp/DataSinks_types.h>
+
+#include <chrono>
+#include <mutex>
+#include <shared_mutex>
+
+#include "common/exception.h"
+#include "runtime/exec_env.h"
#include "runtime/group_commit_mgr.h"
#include "runtime/runtime_state.h"
#include "util/doris_metrics.h"
@@ -61,6 +69,8 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) {
Status GroupCommitBlockSink::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSink::prepare(state));
+ RETURN_IF_ERROR(
+
ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(_load_id.to_thrift(),
0));
_state = state;
// profile must add to state's object pool
@@ -107,7 +117,7 @@ Status GroupCommitBlockSink::close(RuntimeState* state,
Status close_status) {
(double)state->num_rows_load_filtered() / num_selected_rows >
_max_filter_ratio) {
return Status::DataQualityError("too many filtered rows");
}
- RETURN_IF_ERROR(_add_blocks());
+ RETURN_IF_ERROR(_add_blocks(_group_commit_mode !=
TGroupCommitMode::SYNC_MODE, true));
}
if (_load_block_queue) {
_load_block_queue->remove_load_id(_load_id);
@@ -213,7 +223,7 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state,
_blocks.emplace_back(output_block);
} else {
if (!_is_block_appended) {
- RETURN_IF_ERROR(_add_blocks());
+ RETURN_IF_ERROR(_add_blocks(_group_commit_mode !=
TGroupCommitMode::SYNC_MODE, false));
}
RETURN_IF_ERROR(_load_block_queue->add_block(
output_block, _group_commit_mode !=
TGroupCommitMode::SYNC_MODE));
@@ -221,7 +231,7 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state,
return Status::OK();
}
-Status GroupCommitBlockSink::_add_blocks() {
+Status GroupCommitBlockSink::_add_blocks(bool write_wal, bool
is_blocks_contain_all_load_data) {
DCHECK(_is_block_appended == false);
TUniqueId load_id;
load_id.__set_hi(_load_id.hi);
@@ -231,6 +241,20 @@ Status GroupCommitBlockSink::_add_blocks() {
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
_db_id, _table_id, _base_schema_version, load_id,
_load_block_queue,
_state->be_exec_version()));
+ if (write_wal) {
+ _group_commit_mode =
_load_block_queue->has_enough_wal_disk_space(
+ _blocks, load_id,
is_blocks_contain_all_load_data)
+ ? TGroupCommitMode::ASYNC_MODE
+ : TGroupCommitMode::SYNC_MODE;
+ if (_group_commit_mode == TGroupCommitMode::SYNC_MODE) {
+ LOG(INFO)
+ << "Load label " << _load_block_queue->label
+ << " will not write wal because wal disk space
usage reachs max limit.";
+ } else {
+ LOG(INFO) << "Load label " << _load_block_queue->label <<
" will write wal to "
+ << _load_block_queue->wal_base_path << ".";
+ }
+ }
_state->set_import_label(_load_block_queue->label);
_state->set_wal_id(_load_block_queue->txn_id);
} else {
diff --git a/be/src/vec/sink/group_commit_block_sink.h
b/be/src/vec/sink/group_commit_block_sink.h
index d7c8ed70d6d..c0971f4801a 100644
--- a/be/src/vec/sink/group_commit_block_sink.h
+++ b/be/src/vec/sink/group_commit_block_sink.h
@@ -47,7 +47,7 @@ public:
private:
Status _add_block(RuntimeState* state, std::shared_ptr<vectorized::Block>
block);
- Status _add_blocks();
+ Status _add_blocks(bool write_wal, bool is_blocks_contain_all_load_data);
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
diff --git a/be/src/vec/sink/writer/vwal_writer.cpp
b/be/src/vec/sink/writer/vwal_writer.cpp
index d929207e9a9..2dc945a2a2f 100644
--- a/be/src/vec/sink/writer/vwal_writer.cpp
+++ b/be/src/vec/sink/writer/vwal_writer.cpp
@@ -42,11 +42,10 @@
namespace doris {
namespace vectorized {
-VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id,
- const std::string& import_label, WalManager*
wal_manager,
- std::vector<TSlotDescriptor>& slot_desc, int
be_exe_version)
- : _db_id(db_id),
- _tb_id(tb_id),
+VWalWriter::VWalWriter(int64_t tb_id, int64_t wal_id, const std::string&
import_label,
+ WalManager* wal_manager, std::vector<TSlotDescriptor>&
slot_desc,
+ int be_exe_version)
+ : _tb_id(tb_id),
_wal_id(wal_id),
_label(import_label),
_wal_manager(wal_manager),
@@ -56,7 +55,6 @@ VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t
wal_id,
VWalWriter::~VWalWriter() {}
Status VWalWriter::init() {
- RETURN_IF_ERROR(_wal_manager->add_wal_path(_db_id, _tb_id, _wal_id,
_label));
RETURN_IF_ERROR(_wal_manager->create_wal_writer(_wal_id, _wal_writer));
_wal_manager->add_wal_status_queue(_tb_id, _wal_id,
WalManager::WAL_STATUS::CREATE);
std::stringstream ss;
diff --git a/be/src/vec/sink/writer/vwal_writer.h
b/be/src/vec/sink/writer/vwal_writer.h
index 17c9dc979a1..324409e9d46 100644
--- a/be/src/vec/sink/writer/vwal_writer.h
+++ b/be/src/vec/sink/writer/vwal_writer.h
@@ -83,7 +83,7 @@ namespace vectorized {
class VWalWriter {
public:
- VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, const
std::string& import_label,
+ VWalWriter(int64_t tb_id, int64_t wal_id, const std::string& import_label,
WalManager* wal_manager, std::vector<TSlotDescriptor>&
slot_desc,
int be_exe_version);
~VWalWriter();
@@ -92,7 +92,6 @@ public:
Status close();
private:
- int64_t _db_id;
int64_t _tb_id;
int64_t _wal_id;
uint32_t _version = 0;
diff --git a/be/test/http/stream_load_test.cpp
b/be/test/http/stream_load_test.cpp
index 75588f4190b..74dcf9f1f3f 100644
--- a/be/test/http/stream_load_test.cpp
+++ b/be/test/http/stream_load_test.cpp
@@ -19,6 +19,8 @@
#include <gtest/gtest.h>
+#include <memory>
+
#include "common/config.h"
#include "event2/buffer.h"
#include "event2/event.h"
@@ -34,6 +36,7 @@
#include "http/http_request.h"
#include "http/utils.h"
#include "olap/wal_manager.h"
+#include "runtime/exec_env.h"
namespace doris {
@@ -51,10 +54,14 @@ void http_request_done_cb(struct evhttp_request* req, void*
arg) {
TEST_F(StreamLoadTest, TestHeader) {
// 1G
- config::wal_max_disk_size = 1073741824;
+ auto wal_mgr = WalManager::create_shared(ExecEnv::GetInstance(),
config::group_commit_wal_path);
+ static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_1", 1000, 0, 0));
+ static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_2", 10000, 0,
0));
+ static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_3", 100000, 0,
0));
+ ExecEnv::GetInstance()->set_wal_mgr(wal_mgr);
// 1. empty info
{
- auto evhttp_req = evhttp_request_new(nullptr, nullptr);
+ auto* evhttp_req = evhttp_request_new(nullptr, nullptr);
HttpRequest req(evhttp_req);
EXPECT_EQ(load_size_smaller_than_wal_limit(&req), false);
evhttp_request_free(evhttp_req);
@@ -86,7 +93,7 @@ TEST_F(StreamLoadTest, TestHeader) {
evhttp_req->uri = uri;
evhttp_req->uri_elems = evhttp_uri_parse(uri);
evhttp_add_header(evhttp_req->input_headers,
HTTP_GROUP_COMMIT.c_str(), "true");
- evhttp_add_header(evhttp_req->input_headers,
HttpHeaders::CONTENT_LENGTH, "1000");
+ evhttp_add_header(evhttp_req->input_headers,
HttpHeaders::CONTENT_LENGTH, "20000");
HttpRequest req(evhttp_req);
req.init_from_evhttp();
EXPECT_EQ(load_size_smaller_than_wal_limit(&req), true);
diff --git a/be/test/olap/wal_manager_test.cpp
b/be/test/olap/wal_manager_test.cpp
index 17fb3e3e6f4..93d8636eb22 100644
--- a/be/test/olap/wal_manager_test.cpp
+++ b/be/test/olap/wal_manager_test.cpp
@@ -18,6 +18,7 @@
#include <gtest/gtest.h>
+#include <cstddef>
#include <filesystem>
#include <map>
#include <string>
@@ -27,6 +28,7 @@
#include "gen_cpp/HeartbeatService_types.h"
#include "gen_cpp/internal_service.pb.h"
#include "io/fs/local_file_system.h"
+#include "olap/options.h"
#include "runtime/decimalv2_value.h"
#include "runtime/exec_env.h"
#include "runtime/result_queue_mgr.h"
@@ -46,8 +48,7 @@ extern Status k_stream_load_plan_status;
extern std::string k_request_line;
ExecEnv* _env = nullptr;
-std::string wal_dir = "./wal_test";
-std::string tmp_dir = "./wal_test/tmp";
+std::string wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
class WalManagerTest : public testing::Test {
public:
@@ -63,6 +64,7 @@ public:
_env->_internal_client_cache = new
BrpcClientCache<PBackendService_Stub>();
_env->_function_client_cache = new
BrpcClientCache<PFunctionService_Stub>();
_env->_stream_load_executor = StreamLoadExecutor::create_shared(_env);
+ _env->_store_paths = {StorePath(std::filesystem::current_path(), 0)};
_env->_wal_manager = WalManager::create_shared(_env, wal_dir);
k_stream_load_begin_result = TLoadTxnBeginResult();
k_stream_load_plan_status = Status::OK();
@@ -78,16 +80,14 @@ public:
void prepare() {
static_cast<void>(io::global_local_filesystem()->create_directory(wal_dir)); }
void createWal(const std::string& wal_path) {
- std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes =
- std::make_shared<std::atomic_size_t>(0);
- std::shared_ptr<std::condition_variable> cv =
std::make_shared<std::condition_variable>();
- auto wal_writer = WalWriter(wal_path, _all_wal_disk_bytes, cv);
+ auto wal_writer = WalWriter(wal_path);
static_cast<void>(wal_writer.init());
static_cast<void>(wal_writer.finalize());
}
};
TEST_F(WalManagerTest, recovery_normal) {
+ _env->wal_mgr()->wal_limit_test_bytes = 1099511627776;
k_request_line = "{\"Status\": \"Success\", \"Message\": \"Test\"}";
std::string db_id = "1";
@@ -123,4 +123,93 @@ TEST_F(WalManagerTest, recovery_normal) {
ASSERT_TRUE(!std::filesystem::exists(wal_200));
ASSERT_TRUE(!std::filesystem::exists(wal_201));
}
+
+TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) {
+ auto wal_mgr = WalManager::create_shared(_env,
config::group_commit_wal_path);
+ static_cast<void>(wal_mgr->init());
+ _env->set_wal_mgr(wal_mgr);
+
+ // 1T
+ size_t available_bytes = 1099511627776;
+ size_t wal_limit_bytes;
+
+ _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+ config::group_commit_wal_max_disk_limit = "0%";
+ EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+ EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 0);
+
+ _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+ config::group_commit_wal_max_disk_limit = "5%";
+ EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+ wal_limit_bytes = available_bytes * 0.05;
+ EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, wal_limit_bytes);
+
+ _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+ config::group_commit_wal_max_disk_limit = "50%";
+ EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+ wal_limit_bytes = available_bytes * 0.5;
+ EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, wal_limit_bytes);
+
+ _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+ config::group_commit_wal_max_disk_limit = "200%";
+ EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+ wal_limit_bytes = available_bytes * 2;
+ EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, wal_limit_bytes);
+
+ _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+ config::group_commit_wal_max_disk_limit = "-10%";
+ EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(),
Status::InternalError(""));
+ EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, available_bytes);
+
+ _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+ config::group_commit_wal_max_disk_limit = "0";
+ EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+ EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 0);
+
+ // 1M
+ _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+ config::group_commit_wal_max_disk_limit = "1048576";
+ EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+ EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 1048576);
+
+ // 1G
+ _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+ config::group_commit_wal_max_disk_limit = "1073741824";
+ EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+ EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 1073741824);
+
+ // 100G
+ _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+ config::group_commit_wal_max_disk_limit = "107374182400";
+ EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+ EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 107374182400);
+
+ // 1M
+ _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+ config::group_commit_wal_max_disk_limit = "1M";
+ EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+ EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 1048576);
+
+ // 1G
+ _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+ config::group_commit_wal_max_disk_limit = "1G";
+ EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+ EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 1073741824);
+
+ // 100G
+ _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+ config::group_commit_wal_max_disk_limit = "100G";
+ EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+ EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 107374182400);
+
+ _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+ config::group_commit_wal_max_disk_limit = "-1024";
+ EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(),
Status::InternalError(""));
+ EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, available_bytes);
+
+ _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+ config::group_commit_wal_max_disk_limit = "-1M";
+ EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(),
Status::InternalError(""));
+ EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, available_bytes);
+}
} // namespace doris
\ No newline at end of file
diff --git a/be/test/olap/wal_reader_writer_test.cpp
b/be/test/olap/wal_reader_writer_test.cpp
index 1d1102f350f..d24db728680 100644
--- a/be/test/olap/wal_reader_writer_test.cpp
+++ b/be/test/olap/wal_reader_writer_test.cpp
@@ -90,10 +90,7 @@ void generate_block(PBlock& pblock, int row_index) {
TEST_F(WalReaderWriterTest, TestWriteAndRead1) {
std::string file_name = _s_test_data_path + "/abcd123.txt";
- std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes =
- std::make_shared<std::atomic_size_t>(0);
- std::shared_ptr<std::condition_variable> cv =
std::make_shared<std::condition_variable>();
- auto wal_writer = WalWriter(file_name, _all_wal_disk_bytes, cv);
+ auto wal_writer = WalWriter(file_name);
static_cast<void>(wal_writer.init());
size_t file_len = 0;
int64_t file_size = -1;
diff --git a/be/test/vec/exec/vtablet_sink_test.cpp
b/be/test/vec/exec/vtablet_sink_test.cpp
index 81c2bdc4b07..f0b8e8c9643 100644
--- a/be/test/vec/exec/vtablet_sink_test.cpp
+++ b/be/test/vec/exec/vtablet_sink_test.cpp
@@ -496,7 +496,7 @@ public:
private:
ExecEnv* _env = nullptr;
brpc::Server* _server = nullptr;
- std::string wal_dir = "./wal_test";
+ std::string wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
};
TEST_F(VOlapTableSinkTest, normal) {
diff --git a/be/test/vec/exec/vwal_scanner_test.cpp
b/be/test/vec/exec/vwal_scanner_test.cpp
index a239741a5cb..ac066aceb98 100644
--- a/be/test/vec/exec/vwal_scanner_test.cpp
+++ b/be/test/vec/exec/vwal_scanner_test.cpp
@@ -57,7 +57,7 @@ private:
void init_desc_table();
ExecEnv* _env = nullptr;
- std::string wal_dir = "./wal_test";
+ std::string wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
int64_t db_id = 1;
int64_t tb_id = 2;
int64_t txn_id = 789;
@@ -195,6 +195,7 @@ void VWalScannerTest::init_desc_table() {
}
void VWalScannerTest::init() {
+ config::group_commit_wal_max_disk_limit = "100M";
init_desc_table();
static_cast<void>(io::global_local_filesystem()->create_directory(
wal_dir + "/" + std::to_string(db_id) + "/" +
std::to_string(tb_id)));
@@ -215,7 +216,9 @@ void VWalScannerTest::init() {
_env = ExecEnv::GetInstance();
_env->_wal_manager = WalManager::create_shared(_env, wal_dir);
- auto st = _env->_wal_manager->add_wal_path(db_id, tb_id, txn_id, label);
+ std::string base_path;
+ auto st = _env->_wal_manager->_init_wal_dirs_info();
+ st = _env->_wal_manager->add_wal_path(db_id, tb_id, txn_id, label,
base_path);
}
TEST_F(VWalScannerTest, normal) {
diff --git
a/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy
b/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy
index f5cb97fbae8..9a081aa2d59 100644
---
a/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy
+++
b/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy
@@ -70,7 +70,7 @@ suite("test_group_commit_and_wal_back_pressure", "p2") {
unset 'label'
file 'test_group_commit_and_wal_back_pressure.csv.gz'
- time 100000
+ time 600000
}
})
}
@@ -88,7 +88,7 @@ suite("test_group_commit_and_wal_back_pressure", "p2") {
unset 'label'
file 'test_group_commit_and_wal_back_pressure.csv.gz'
- time 100000
+ time 600000
}
})
}
@@ -106,7 +106,7 @@ suite("test_group_commit_and_wal_back_pressure", "p2") {
unset 'label'
file 'test_group_commit_and_wal_back_pressure.csv.gz'
- time 100000
+ time 600000
}
})
}
diff --git
a/regression-test/suites/load_p0/stream_load/test_group_commit_wal_limit.groovy
b/regression-test/suites/load_p0/stream_load/test_group_commit_wal_limit.groovy
index 24b66de04b8..7361faf72df 100644
---
a/regression-test/suites/load_p0/stream_load/test_group_commit_wal_limit.groovy
+++
b/regression-test/suites/load_p0/stream_load/test_group_commit_wal_limit.groovy
@@ -41,7 +41,7 @@ suite("test_group_commit_wal_limit") {
// normal case
StringBuilder strBuilder = new StringBuilder()
strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser
+ ":" + context.config.jdbcPassword)
- strBuilder.append(" -H \"group_commit:true\" -H \"column_separator:,\" " )
+ strBuilder.append(" -H \"group_commit:async_mode\" -H
\"column_separator:,\" " )
strBuilder.append(" -H \"compress_type:gz\" -H \"format:csv\" " )
strBuilder.append(" -T " + context.config.dataPath +
"/load_p0/stream_load/test_group_commit_wal_limit.csv.gz")
strBuilder.append(" http://" + context.config.feHttpAddress +
"/api/${db}/${tableName}/_stream_load")
@@ -55,49 +55,13 @@ suite("test_group_commit_wal_limit") {
logger.info("out is " + out )
assertTrue(out.contains('group_commit'))
- // chunked data case
- strBuilder = new StringBuilder()
- strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser
+ ":" + context.config.jdbcPassword)
- strBuilder.append(" -H \"group_commit:true\" -H \"column_separator:,\" " )
- strBuilder.append(" -H \"compress_type:gz\" -H \"format:csv\" " )
- strBuilder.append(" -H \"Content-Length:0\" " )
- strBuilder.append(" -T " + context.config.dataPath +
"/load_p0/stream_load/test_group_commit_wal_limit.csv.gz")
- strBuilder.append(" http://" + context.config.feHttpAddress +
"/api/${db}/${tableName}/_stream_load")
-
- command = strBuilder.toString()
- logger.info("command is " + command)
- process = ['bash','-c',command].execute()
- code = process.waitFor()
- assertEquals(code, 0)
- out = process.text
- logger.info("out is " + out )
- assertTrue(out.contains('Stream load size too large'))
-
- // too lagre data case 1TB
- strBuilder = new StringBuilder()
- strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser
+ ":" + context.config.jdbcPassword)
- strBuilder.append(" -H \"group_commit:true\" -H \"column_separator:,\" " )
- strBuilder.append(" -H \"compress_type:gz\" -H \"format:csv\" " )
- strBuilder.append(" -H \"Content-Length:1099511627776\" " )
- strBuilder.append(" -T " + context.config.dataPath +
"/load_p0/stream_load/test_group_commit_wal_limit.csv.gz")
- strBuilder.append(" http://" + context.config.feHttpAddress +
"/api/${db}/${tableName}/_stream_load")
-
- command = strBuilder.toString()
- logger.info("command is " + command)
- process = ['bash','-c',command].execute()
- code = process.waitFor()
- assertEquals(code, 0)
- out = process.text
- logger.info("out is " + out )
- assertTrue(out.contains('Stream load size too large'))
-
// httpload
// normal case
strBuilder = new StringBuilder()
strBuilder.append("curl -v --location-trusted -u " +
context.config.jdbcUser + ":" + context.config.jdbcPassword)
String sql = " -H \"sql:insert into " + db + "." + tableName + " (k,v)
select c1, c2 from http_stream(\\\"format\\\" = \\\"csv\\\",
\\\"column_separator\\\" = \\\",\\\", \\\"compress_type\\\" = \\\"gz\\\" ) \" "
strBuilder.append(sql)
- strBuilder.append(" -H \"group_commit:true\"")
+ strBuilder.append(" -H \"group_commit:async_mode\"")
strBuilder.append(" -T " + context.config.dataPath +
"/load_p0/stream_load/test_group_commit_wal_limit.csv.gz")
strBuilder.append(" http://" + context.config.feHttpAddress +
"/api/_http_stream")
@@ -109,40 +73,4 @@ suite("test_group_commit_wal_limit") {
out = process.text
logger.info("out is " + out )
assertTrue(out.contains('group_commit'))
-
- // chunked data case
- strBuilder = new StringBuilder()
- strBuilder.append("curl -v --location-trusted -u " +
context.config.jdbcUser + ":" + context.config.jdbcPassword)
- sql = " -H \"sql:insert into " + db + "." + tableName + " (k,v) select c1,
c2 from http_stream(\\\"format\\\" = \\\"csv\\\", \\\"column_separator\\\" =
\\\",\\\", \\\"compress_type\\\" = \\\"gz\\\" ) \" "
- strBuilder.append(sql)
- strBuilder.append(" -H \"group_commit:true\" -H \"Content-Length:0\"")
- strBuilder.append(" -T " + context.config.dataPath +
"/load_p0/stream_load/test_group_commit_wal_limit.csv.gz")
- strBuilder.append(" http://" + context.config.feHttpAddress +
"/api/_http_stream")
-
- command = strBuilder.toString()
- logger.info("command is " + command)
- process = ['bash','-c',command].execute()
- code = process.waitFor()
- assertEquals(code, 0)
- out = process.text
- logger.info("out is " + out )
- assertTrue(out.contains('Http load size too large'))
-
- // too lagre data case 1TB
- strBuilder = new StringBuilder()
- strBuilder.append("curl -v --location-trusted -u " +
context.config.jdbcUser + ":" + context.config.jdbcPassword)
- sql = " -H \"sql:insert into " + db + "." + tableName + " (k,v) select c1,
c2 from http_stream(\\\"format\\\" = \\\"csv\\\", \\\"column_separator\\\" =
\\\",\\\", \\\"compress_type\\\" = \\\"gz\\\" ) \" "
- strBuilder.append(sql)
- strBuilder.append(" -H \"group_commit:true\" -H
\"Content-Length:1099511627776\"")
- strBuilder.append(" -T " + context.config.dataPath +
"/load_p0/stream_load/test_group_commit_wal_limit.csv.gz")
- strBuilder.append(" http://" + context.config.feHttpAddress +
"/api/_http_stream")
-
- command = strBuilder.toString()
- logger.info("command is " + command)
- process = ['bash','-c',command].execute()
- code = process.waitFor()
- assertEquals(code, 0)
- out = process.text
- logger.info("out is " + out )
- assertTrue(out.contains('Http load size too large'))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]