This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ff8bd2e9c9 [Enhancement](Wal)Support dynamic wal space limit (#27726)
9ff8bd2e9c9 is described below

commit 9ff8bd2e9c995760b740800addd42eddfa8e25df
Author: abmdocrt <[email protected]>
AuthorDate: Wed Dec 27 11:51:32 2023 +0800

    [Enhancement](Wal)Support dynamic wal space limit (#27726)
---
 be/src/common/config.cpp                           |   7 +-
 be/src/common/config.h                             |   5 +-
 be/src/http/action/http_stream.cpp                 |  55 +++--
 be/src/http/action/stream_load.cpp                 |  64 ++++--
 be/src/http/utils.cpp                              |   7 +-
 be/src/io/fs/local_file_system.cpp                 |  13 ++
 be/src/io/fs/local_file_system.h                   |   1 +
 be/src/olap/wal_dirs_info.cpp                      | 224 +++++++++++++++++++++
 be/src/olap/wal_dirs_info.h                        |  85 ++++++++
 be/src/olap/wal_manager.cpp                        | 165 ++++++++++++---
 be/src/olap/wal_manager.h                          |  37 +++-
 be/src/olap/wal_writer.cpp                         |  28 +--
 be/src/olap/wal_writer.h                           |  10 +-
 be/src/runtime/exec_env.h                          |   1 +
 be/src/runtime/group_commit_mgr.cpp                |  92 ++++++++-
 be/src/runtime/group_commit_mgr.h                  |  24 ++-
 be/src/vec/sink/group_commit_block_sink.cpp        |  30 ++-
 be/src/vec/sink/group_commit_block_sink.h          |   2 +-
 be/src/vec/sink/writer/vwal_writer.cpp             |  10 +-
 be/src/vec/sink/writer/vwal_writer.h               |   3 +-
 be/test/http/stream_load_test.cpp                  |  13 +-
 be/test/olap/wal_manager_test.cpp                  | 101 +++++++++-
 be/test/olap/wal_reader_writer_test.cpp            |   5 +-
 be/test/vec/exec/vtablet_sink_test.cpp             |   2 +-
 be/test/vec/exec/vwal_scanner_test.cpp             |   7 +-
 .../test_group_commit_and_wal_back_pressure.groovy |   6 +-
 .../stream_load/test_group_commit_wal_limit.groovy |  76 +------
 27 files changed, 846 insertions(+), 227 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 9a886ff19c9..358db99e592 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1099,7 +1099,7 @@ DEFINE_Int32(grace_shutdown_wait_seconds, "120");
 DEFINE_Int16(bitmap_serialize_version, "1");
 
 // group commit insert config
-DEFINE_String(group_commit_wal_path, "./wal");
+DEFINE_String(group_commit_wal_path, "");
 DEFINE_Int32(group_commit_replay_wal_retry_num, "10");
 DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5");
 DEFINE_Int32(group_commit_relay_wal_threads, "10");
@@ -1128,8 +1128,9 @@ DEFINE_String(default_tzfiles_path, 
"${DORIS_HOME}/zoneinfo");
 // Max size(bytes) of group commit queues, used for mem back pressure, defult 
64M.
 DEFINE_Int32(group_commit_max_queue_size, "67108864");
 
-// Max size(bytes) of wal disk using, used for disk space back pressure, 
default 64M.
-DEFINE_Int32(wal_max_disk_size, "67108864");
+// Max size(bytes) or percentage(%) of wal disk usage, used for disk space 
back pressure, default 10% of the disk available space.
+// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% 
can be automatically identified.
+DEFINE_String(group_commit_wal_max_disk_limit, "10%");
 
 // Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency
 DEFINE_Int32(ingest_binlog_work_pool_size, "-1");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index bb341002902..a41a3d06141 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1203,8 +1203,9 @@ DECLARE_String(default_tzfiles_path);
 // Max size(bytes) of group commit queues, used for mem back pressure.
 DECLARE_Int32(group_commit_max_queue_size);
 
-// Max size(bytes) of wal disk using, used for disk space back pressure.
-DECLARE_Int32(wal_max_disk_size);
+// Max size(bytes) or percentage(%) of wal disk usage, used for disk space 
back pressure, default 10% of the disk available space.
+// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% 
can be automatically identified.
+DECLARE_mString(group_commit_wal_max_disk_limit);
 
 // Ingest binlog work pool size
 DECLARE_Int32(ingest_binlog_work_pool_size);
diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index 8c75f6be740..18256237a81 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -46,6 +46,7 @@
 #include "http/utils.h"
 #include "io/fs/stream_load_pipe.h"
 #include "olap/storage_engine.h"
+#include "olap/wal_manager.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
@@ -169,31 +170,38 @@ int HttpStreamAction::on_header(HttpRequest* req) {
 
     Status st = Status::OK();
     std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
-    if (iequal(group_commit_mode, "off_mode")) {
-        group_commit_mode = "";
-    }
     if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") 
&&
         !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, 
"off_mode")) {
         st = Status::InternalError("group_commit can only be [async_mode, 
sync_mode, off_mode]");
-        if (iequal(group_commit_mode, "off_mode")) {
-            group_commit_mode = "";
-        }
+    } else if (group_commit_mode.empty() || iequal(group_commit_mode, 
"off_mode")) {
+        // off_mode and empty
+        group_commit_mode = "off_mode";
+        ctx->group_commit = false;
+    } else {
+        // sync_mode and async_mode
+        ctx->group_commit = true;
     }
     ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true";
     auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty();
     auto partitions = !req->header(HTTP_PARTITIONS).empty();
     if (!temp_partitions && !partitions && !ctx->two_phase_commit &&
         (!group_commit_mode.empty() || 
config::wait_internal_group_commit_finish)) {
-        if (config::wait_internal_group_commit_finish) {
+        if (iequal(group_commit_mode, "async_mode") || 
config::wait_internal_group_commit_finish) {
             ctx->group_commit = true;
-        } else {
-            ctx->group_commit = load_size_smaller_than_wal_limit(req);
-            if (!ctx->group_commit) {
-                LOG(WARNING) << "The data size for this http load("
-                             << req->header(HttpHeaders::CONTENT_LENGTH)
-                             << " Bytes) exceeds the WAL (Write-Ahead Log) 
limit ("
-                             << config::wal_max_disk_size * 0.8
-                             << " Bytes). Please set this load to \"group 
commit\"=false.";
+            group_commit_mode = load_size_smaller_than_wal_limit(req) ? 
"async_mode" : "sync_mode";
+            if (iequal(group_commit_mode, "sync_mode")) {
+                size_t max_available_size =
+                        
ExecEnv::GetInstance()->wal_mgr()->get_max_available_size();
+                LOG(INFO) << "When enable group commit, the data size can't be 
too large. The data "
+                             "size "
+                             "for this http load("
+                          << (req->header(HttpHeaders::CONTENT_LENGTH).empty()
+                                      ? 0
+                                      : 
std::stol(req->header(HttpHeaders::CONTENT_LENGTH)))
+                          << " Bytes) exceeds the WAL (Write-Ahead Log) limit 
("
+                          << max_available_size
+                          << " Bytes). So we set this load to \"group 
commit\"=sync_mode "
+                             "automatically.";
                 st = Status::Error<EXCEEDED_LIMIT>("Http load size too 
large.");
             }
         }
@@ -358,6 +366,23 @@ Status HttpStreamAction::_process_put(HttpRequest* 
http_req,
     ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
     ctx->label = ctx->put_result.params.import_label;
     ctx->put_result.params.__set_wal_id(ctx->wal_id);
+    if (http_req->header(HTTP_GROUP_COMMIT) == "async mode") {
+        if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
+            size_t content_length = 0;
+            content_length = 
std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
+            if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
+                ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
+                ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
+                ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
+                ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
+                ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
+                ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
+                content_length *= 3;
+            }
+            
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(
+                    ctx->id.to_thrift(), content_length));
+        }
+    }
 
     return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
 }
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 04c29a53027..eddf31856a0 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -32,6 +32,7 @@
 #include <thrift/protocol/TDebugProtocol.h>
 #include <time.h>
 
+#include <algorithm>
 #include <future>
 #include <map>
 #include <sstream>
@@ -189,15 +190,16 @@ int StreamLoadAction::on_header(HttpRequest* req) {
     ctx->label = req->header(HTTP_LABEL_KEY);
     Status st = Status::OK();
     std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
-    if (iequal(group_commit_mode, "off_mode")) {
-        group_commit_mode = "";
-    }
     if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") 
&&
         !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, 
"off_mode")) {
         st = Status::InternalError("group_commit can only be [async_mode, 
sync_mode, off_mode]");
-        if (iequal(group_commit_mode, "off_mode")) {
-            group_commit_mode = "";
-        }
+    } else if (group_commit_mode.empty() || iequal(group_commit_mode, 
"off_mode")) {
+        // off_mode and empty
+        group_commit_mode = "off_mode";
+        ctx->group_commit = false;
+    } else {
+        // sync_mode and async_mode
+        ctx->group_commit = true;
     }
     auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() &&
                            iequal(req->header(HTTP_PARTIAL_COLUMNS), "true");
@@ -206,19 +208,26 @@ int StreamLoadAction::on_header(HttpRequest* req) {
     auto partitions = !req->header(HTTP_PARTITIONS).empty();
     if (!partial_columns && !partitions && !temp_partitions && 
!ctx->two_phase_commit &&
         (!group_commit_mode.empty() || 
config::wait_internal_group_commit_finish)) {
-        if (!group_commit_mode.empty() && !ctx->label.empty()) {
+        if (!config::wait_internal_group_commit_finish && ctx->group_commit &&
+            !ctx->label.empty()) {
             st = Status::InternalError("label and group_commit can't be set at 
the same time");
         }
-        if (config::wait_internal_group_commit_finish) {
+        if (iequal(group_commit_mode, "async_mode") || 
config::wait_internal_group_commit_finish) {
             ctx->group_commit = true;
-        } else {
-            ctx->group_commit = load_size_smaller_than_wal_limit(req);
-            if (!ctx->group_commit) {
-                LOG(WARNING) << "The data size for this stream load("
-                             << req->header(HttpHeaders::CONTENT_LENGTH)
-                             << " Bytes) exceeds the WAL (Write-Ahead Log) 
limit ("
-                             << config::wal_max_disk_size * 0.8
-                             << " Bytes). Please set this load to \"group 
commit\"=false.";
+            group_commit_mode = load_size_smaller_than_wal_limit(req) ? 
"async_mode" : "sync_mode";
+            if (iequal(group_commit_mode, "sync_mode")) {
+                size_t max_available_size =
+                        
ExecEnv::GetInstance()->wal_mgr()->get_max_available_size();
+                LOG(INFO) << "When enable group commit, the data size can't be 
too large. The data "
+                             "size "
+                             "for this stream load("
+                          << (req->header(HttpHeaders::CONTENT_LENGTH).empty()
+                                      ? 0
+                                      : 
std::stol(req->header(HttpHeaders::CONTENT_LENGTH)))
+                          << " Bytes) exceeds the WAL (Write-Ahead Log) limit 
("
+                          << max_available_size
+                          << " Bytes). So we set this load to \"group 
commit\"=sync_mode "
+                             "automatically.";
                 st = Status::Error<EXCEEDED_LIMIT>("Stream load size too 
large.");
             }
         }
@@ -640,11 +649,7 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req,
         request.__set_memtable_on_sink_node(value);
     }
     if (ctx->group_commit) {
-        if (!http_req->header(HTTP_GROUP_COMMIT).empty()) {
-            
request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
-        } else {
-            request.__set_group_commit_mode("sync_mode");
-        }
+        request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
     }
 
 #ifndef BE_TEST
@@ -665,6 +670,23 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req,
         LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status 
<< ctx->brief();
         return plan_status;
     }
+    if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
+        if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
+            size_t content_length = 0;
+            content_length = 
std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
+            if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
+                ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
+                ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
+                ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
+                ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
+                ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
+                ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
+                content_length *= 3;
+            }
+            
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(
+                    ctx->id.to_thrift(), content_length));
+        }
+    }
 
     VLOG_NOTICE << "params is " << 
apache::thrift::ThriftDebugString(ctx->put_result.params);
     // if we not use streaming, we must download total content before we begin
diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp
index 0d66887663f..9fbf4c79300 100644
--- a/be/src/http/utils.cpp
+++ b/be/src/http/utils.cpp
@@ -22,6 +22,7 @@
 #include <sys/stat.h>
 #include <unistd.h>
 
+#include <algorithm>
 #include <memory>
 #include <ostream>
 #include <vector>
@@ -38,6 +39,8 @@
 #include "http/http_status.h"
 #include "io/fs/file_system.h"
 #include "io/fs/local_file_system.h"
+#include "olap/wal_manager.h"
+#include "runtime/exec_env.h"
 #include "util/path_util.h"
 #include "util/url_coding.h"
 
@@ -199,8 +202,8 @@ bool load_size_smaller_than_wal_limit(HttpRequest* req) {
     // these blocks within the limited space. So we need to set group_commit = 
false to avoid dead lock.
     if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
         size_t body_bytes = 
std::stol(req->header(HttpHeaders::CONTENT_LENGTH));
-        // TODO(Yukang): change it to WalManager::wal_limit
-        return (body_bytes <= config::wal_max_disk_size * 0.8) && (body_bytes 
!= 0);
+        size_t max_available_size = 
ExecEnv::GetInstance()->wal_mgr()->get_max_available_size();
+        return (body_bytes != 0 && body_bytes < 0.8 * max_available_size);
     } else {
         return false;
     }
diff --git a/be/src/io/fs/local_file_system.cpp 
b/be/src/io/fs/local_file_system.cpp
index 83e1554fe6b..a71708ac72c 100644
--- a/be/src/io/fs/local_file_system.cpp
+++ b/be/src/io/fs/local_file_system.cpp
@@ -172,6 +172,19 @@ Status LocalFileSystem::file_size_impl(const Path& file, 
int64_t* file_size) con
     return Status::OK();
 }
 
+Status LocalFileSystem::directory_size(const Path& dir_path, size_t* dir_size) 
{
+    *dir_size = 0;
+    if (std::filesystem::exists(dir_path) && 
std::filesystem::is_directory(dir_path)) {
+        for (const auto& entry : 
std::filesystem::recursive_directory_iterator(dir_path)) {
+            if (std::filesystem::is_regular_file(entry)) {
+                *dir_size += std::filesystem::file_size(entry);
+            }
+        }
+        return Status::OK();
+    }
+    return Status::IOError("faile to get dir size {}", dir_path.native());
+}
+
 Status LocalFileSystem::list_impl(const Path& dir, bool only_file, 
std::vector<FileInfo>* files,
                                   bool* exists) {
     RETURN_IF_ERROR(exists_impl(dir, exists));
diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h
index 53df0b8e06b..4f7e5107fb3 100644
--- a/be/src/io/fs/local_file_system.h
+++ b/be/src/io/fs/local_file_system.h
@@ -78,6 +78,7 @@ public:
     // "safe" means the path will be concat with the path prefix 
config::user_files_secure_path,
     // so that it can not list any files outside the 
config::user_files_secure_path
     Status safe_glob(const std::string& path, std::vector<FileInfo>* res);
+    Status directory_size(const Path& dir_path, size_t* dir_size);
 
 protected:
     Status create_file_impl(const Path& file, FileWriterPtr* writer,
diff --git a/be/src/olap/wal_dirs_info.cpp b/be/src/olap/wal_dirs_info.cpp
new file mode 100644
index 00000000000..340d896a8c6
--- /dev/null
+++ b/be/src/olap/wal_dirs_info.cpp
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/wal_dirs_info.h"
+
+#include <memory>
+#include <mutex>
+#include <shared_mutex>
+
+#include "common/config.h"
+#include "common/status.h"
+#include "io/fs/local_file_system.h"
+#include "util/parse_util.h"
+
+namespace doris {
+
+std::string WalDirInfo::get_wal_dir() {
+    return _wal_dir;
+}
+
+size_t WalDirInfo::get_limit() {
+    std::shared_lock rlock(_lock);
+    return _limit;
+}
+
+size_t WalDirInfo::get_used() {
+    std::shared_lock rlock(_lock);
+    return _used;
+}
+
+size_t WalDirInfo::get_pre_allocated() {
+    std::shared_lock rlock(_lock);
+    return _pre_allocated;
+}
+
+Status WalDirInfo::set_limit(size_t limit) {
+    std::unique_lock wlock(_lock);
+    _limit = limit;
+    return Status::OK();
+}
+
+Status WalDirInfo::set_used(size_t used) {
+    std::unique_lock wlock(_lock);
+    _used = used;
+    return Status::OK();
+}
+
+Status WalDirInfo::set_pre_allocated(size_t pre_allocated, bool 
is_add_pre_allocated) {
+    std::unique_lock wlock(_lock);
+    if (is_add_pre_allocated) {
+        _pre_allocated += pre_allocated;
+    } else {
+        _pre_allocated -= pre_allocated;
+    }
+    return Status::OK();
+}
+
+size_t WalDirInfo::available() {
+    std::unique_lock wlock(_lock);
+    int64_t available = _limit - _used - _pre_allocated;
+    return available > 0 ? available : 0;
+}
+
+Status WalDirInfo::update_wal_dir_limit(size_t limit) {
+    if (limit != static_cast<size_t>(-1)) {
+        RETURN_IF_ERROR(set_limit(limit));
+    } else {
+        size_t available_bytes;
+        size_t disk_capacity_bytes;
+        RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(
+                _wal_dir, &disk_capacity_bytes, &available_bytes));
+        bool is_percent = true;
+        int64_t wal_disk_limit = 
ParseUtil::parse_mem_spec(config::group_commit_wal_max_disk_limit,
+                                                           -1, 
available_bytes, &is_percent);
+        if (wal_disk_limit <= 0) {
+            return Status::InternalError("Disk full! Please check your disk 
usage!");
+        }
+        size_t wal_dir_size = 0;
+        
RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, 
&wal_dir_size));
+        RETURN_IF_ERROR(set_limit(wal_disk_limit));
+    }
+    return Status::OK();
+}
+
+Status WalDirInfo::update_wal_dir_used(size_t used) {
+    if (used != static_cast<size_t>(-1)) {
+        RETURN_IF_ERROR(set_used(used));
+    } else {
+        size_t wal_dir_size = 0;
+        
RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, 
&wal_dir_size));
+        RETURN_IF_ERROR(set_used(wal_dir_size));
+    }
+    return Status::OK();
+}
+
+Status WalDirInfo::update_wal_dir_pre_allocated(size_t pre_allocated, bool 
is_add_pre_allocated) {
+    RETURN_IF_ERROR(set_pre_allocated(pre_allocated, is_add_pre_allocated));
+    return Status::OK();
+}
+
+Status WalDirsInfo::add(const std::string& wal_dir, size_t limit, size_t used,
+                        size_t pre_allocated) {
+    for (const auto& it : _wal_dirs_info_vec) {
+        if (it->get_wal_dir() == wal_dir) {
+#ifdef BE_TEST
+            return Status::OK();
+#endif
+            return Status::InternalError("wal dir {} exists!", wal_dir);
+        }
+    }
+    std::unique_lock wlock(_lock);
+    _wal_dirs_info_vec.emplace_back(
+            std::make_shared<WalDirInfo>(wal_dir, limit, used, pre_allocated));
+    return Status::OK();
+}
+
+Status WalDirsInfo::clear() {
+    std::unique_lock wlock(_lock);
+    _wal_dirs_info_vec.clear();
+    return Status::OK();
+}
+
+std::string WalDirsInfo::get_available_random_wal_dir() {
+    if (_wal_dirs_info_vec.size() == 1) {
+        return (*_wal_dirs_info_vec.begin())->get_wal_dir();
+    } else {
+        std::vector<std::string> available_wal_dirs;
+        for (const auto& wal_dir_info : _wal_dirs_info_vec) {
+            if (wal_dir_info->available() > wal_dir_info->get_limit() * 0.2) {
+                available_wal_dirs.emplace_back(wal_dir_info->get_wal_dir());
+            }
+        }
+        if (available_wal_dirs.empty()) {
+            return (*std::min_element(_wal_dirs_info_vec.begin(), 
_wal_dirs_info_vec.end(),
+                                      [](const auto& info1, const auto& info2) 
{
+                                          return info1->available() < 
info2->available();
+                                      }))
+                    ->get_wal_dir();
+        } else {
+            return (*std::next(_wal_dirs_info_vec.begin(), rand() % 
_wal_dirs_info_vec.size()))
+                    ->get_wal_dir();
+        }
+    }
+}
+
+size_t WalDirsInfo::get_max_available_size() {
+    return _wal_dirs_info_vec.size() == 1
+                   ? (*_wal_dirs_info_vec.begin())->available()
+                   : (*std::max_element(_wal_dirs_info_vec.begin(), 
_wal_dirs_info_vec.end(),
+                                        [](const auto& info1, const auto& 
info2) {
+                                            return info1->available() < 
info2->available();
+                                        }))
+                             ->available();
+}
+
+Status WalDirsInfo::update_wal_dir_limit(std::string wal_dir, size_t limit) {
+    for (const auto& wal_dir_info : _wal_dirs_info_vec) {
+        if (wal_dir_info->get_wal_dir() == wal_dir) {
+            return wal_dir_info->update_wal_dir_limit(limit);
+        }
+    }
+    return Status::InternalError("Can not find wal dir in wal disks info.");
+}
+
+Status WalDirsInfo::update_all_wal_dir_limit() {
+    for (const auto& wal_dir_info : _wal_dirs_info_vec) {
+        RETURN_IF_ERROR(wal_dir_info->update_wal_dir_limit(-1));
+    }
+    return Status::OK();
+}
+
+Status WalDirsInfo::update_wal_dir_used(std::string wal_dir, size_t used) {
+    for (const auto& wal_dir_info : _wal_dirs_info_vec) {
+        if (wal_dir_info->get_wal_dir() == wal_dir) {
+            return wal_dir_info->update_wal_dir_used(used);
+        }
+    }
+    return Status::InternalError("Can not find wal dir in wal disks info.");
+}
+
+Status WalDirsInfo::update_all_wal_dir_used() {
+    for (const auto& wal_dir_info : _wal_dirs_info_vec) {
+        RETURN_IF_ERROR(wal_dir_info->update_wal_dir_used(-1));
+    }
+    return Status::OK();
+}
+
+Status WalDirsInfo::update_wal_dir_pre_allocated(std::string wal_dir, size_t 
pre_allocated,
+                                                 bool is_add_pre_allocated) {
+    for (const auto& wal_dir_info : _wal_dirs_info_vec) {
+        if (wal_dir_info->get_wal_dir() == wal_dir) {
+            return wal_dir_info->update_wal_dir_pre_allocated(pre_allocated, 
is_add_pre_allocated);
+        }
+    }
+    return Status::InternalError("Can not find wal dir in wal disks info.");
+}
+
+Status WalDirsInfo::get_wal_dir_available_size(const std::string& wal_dir,
+                                               size_t* available_bytes) {
+    std::shared_lock l(_lock);
+    for (const auto& wal_dir_info : _wal_dirs_info_vec) {
+        if (wal_dir_info->get_wal_dir() == wal_dir) {
+            *available_bytes = wal_dir_info->available();
+            return Status::OK();
+        }
+    }
+    return Status::InternalError("can not find wal dir!");
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_dirs_info.h b/be/src/olap/wal_dirs_info.h
new file mode 100644
index 00000000000..048cd8f9564
--- /dev/null
+++ b/be/src/olap/wal_dirs_info.h
@@ -0,0 +1,85 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <mutex>
+#include <shared_mutex>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "common/factory_creator.h"
+#include "common/status.h"
+
+namespace doris {
+class WalDirInfo {
+    ENABLE_FACTORY_CREATOR(WalDirInfo);
+
+public:
+    WalDirInfo(std::string wal_dir, size_t limit, size_t used, size_t 
pre_allocated)
+            : _wal_dir(std::move(wal_dir)),
+              _limit(limit),
+              _used(used),
+              _pre_allocated(pre_allocated) {}
+    std::string get_wal_dir();
+    size_t get_limit();
+    size_t get_used();
+    size_t get_pre_allocated();
+    Status set_limit(size_t limit);
+    Status set_used(size_t used);
+    Status set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated);
+    size_t available();
+    Status update_wal_dir_limit(size_t limit = -1);
+    Status update_wal_dir_used(size_t used = -1);
+    Status update_wal_dir_pre_allocated(size_t pre_allocated, bool 
is_add_pre_allocated = true);
+
+private:
+    std::string _wal_dir;
+    size_t _limit;
+    size_t _used;
+    size_t _pre_allocated;
+    std::shared_mutex _lock;
+};
+
+class WalDirsInfo {
+    ENABLE_FACTORY_CREATOR(WalDirsInfo);
+
+public:
+    WalDirsInfo() = default;
+    ~WalDirsInfo() = default;
+    Status add(const std::string& wal_dir, size_t limit, size_t used, size_t 
pre_allocated);
+    Status clear();
+    std::string get_available_random_wal_dir();
+    size_t get_max_available_size();
+    Status update_wal_dir_limit(std::string wal_dir, size_t limit = -1);
+    Status update_all_wal_dir_limit();
+    Status update_wal_dir_used(std::string wal_dir, size_t used = -1);
+    Status update_all_wal_dir_used();
+    Status update_wal_dir_pre_allocated(std::string wal_dir, size_t 
pre_allocated,
+                                        bool is_add_pre_allocated);
+    Status get_wal_dir_available_size(const std::string& wal_dir, size_t* 
available_bytes);
+
+private:
+    std::vector<std::shared_ptr<WalDirInfo>> _wal_dirs_info_vec;
+    std::shared_mutex _lock;
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp
index 238f88ebd00..c62996da19b 100644
--- a/be/src/olap/wal_manager.cpp
+++ b/be/src/olap/wal_manager.cpp
@@ -17,6 +17,7 @@
 
 #include "olap/wal_manager.h"
 
+#include <glog/logging.h>
 #include <thrift/protocol/TDebugProtocol.h>
 
 #include <atomic>
@@ -24,14 +25,24 @@
 #include <cstdint>
 #include <filesystem>
 #include <memory>
+#include <mutex>
+#include <shared_mutex>
+#include <thread>
+#include <unordered_map>
 #include <utility>
+#include <vector>
 
+#include "common/config.h"
+#include "common/status.h"
 #include "io/fs/local_file_system.h"
+#include "olap/wal_dirs_info.h"
 #include "olap/wal_writer.h"
 #include "runtime/client_cache.h"
+#include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
 #include "runtime/plan_fragment_executor.h"
 #include "runtime/stream_load/stream_load_context.h"
+#include "util/parse_util.h"
 #include "util/path_util.h"
 #include "util/thrift_rpc_helper.h"
 #include "vec/exec/format/wal/wal_reader.h"
@@ -40,12 +51,11 @@ namespace doris {
 WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list)
         : _exec_env(exec_env), _stop_background_threads_latch(1), _stop(false) 
{
     doris::vectorized::WalReader::string_split(wal_dir_list, ";", _wal_dirs);
-    _all_wal_disk_bytes = std::make_shared<std::atomic_size_t>(0);
-    _cv = std::make_shared<std::condition_variable>();
     static_cast<void>(ThreadPoolBuilder("GroupCommitReplayWalThreadPool")
                               .set_min_threads(1)
                               
.set_max_threads(config::group_commit_relay_wal_threads)
                               .build(&_thread_pool));
+    _wal_dirs_info = WalDirsInfo::create_unique();
 }
 
 WalManager::~WalManager() {
@@ -60,12 +70,46 @@ void WalManager::stop() {
         if (_replay_thread) {
             _replay_thread->join();
         }
+        if (_update_wal_dirs_info_thread) {
+            _update_wal_dirs_info_thread->join();
+        }
         _thread_pool->shutdown();
         LOG(INFO) << "WalManager is stopped";
     }
 }
 
 Status WalManager::init() {
+    RETURN_IF_ERROR(_init_wal_dirs_conf());
+    RETURN_IF_ERROR(_init_wal_dirs());
+    RETURN_IF_ERROR(_init_wal_dirs_info());
+    return Thread::create(
+            "WalMgr", "replay_wal", [this]() { 
static_cast<void>(this->replay()); },
+            &_replay_thread);
+}
+
+Status WalManager::_init_wal_dirs_conf() {
+    std::vector<std::string> tmp_dirs;
+    if (_wal_dirs.empty()) {
+        // default case.
+        for (const StorePath& path : ExecEnv::GetInstance()->store_paths()) {
+            tmp_dirs.emplace_back(path.path + "/wal");
+        }
+    } else {
+        // user config must be absolute path.
+        for (const std::string& wal_dir : _wal_dirs) {
+            if (std::filesystem::path(wal_dir).is_absolute()) {
+                tmp_dirs.emplace_back(wal_dir);
+            } else {
+                return Status::InternalError(
+                        "BE config group_commit_replay_wal_dir has to be 
absolute path!");
+            }
+        }
+    }
+    _wal_dirs = tmp_dirs;
+    return Status::OK();
+}
+
+Status WalManager::_init_wal_dirs() {
     bool exists = false;
     for (auto wal_dir : _wal_dirs) {
         std::string tmp_dir = wal_dir + "/tmp";
@@ -80,9 +124,44 @@ Status WalManager::init() {
         }
         RETURN_IF_ERROR(scan_wals(wal_dir));
     }
+    return Status::OK();
+}
+
+Status WalManager::_init_wal_dirs_info() {
+    for (const std::string& wal_dir : _wal_dirs) {
+        size_t available_bytes;
+#ifndef BE_TEST
+        size_t disk_capacity_bytes;
+        RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(wal_dir, 
&disk_capacity_bytes,
+                                                                      
&available_bytes));
+#else
+        available_bytes = wal_limit_test_bytes;
+#endif
+        bool is_percent = true;
+        int64_t wal_disk_limit = 
ParseUtil::parse_mem_spec(config::group_commit_wal_max_disk_limit,
+                                                           -1, 
available_bytes, &is_percent);
+        if (wal_disk_limit < 0) {
+            return Status::InternalError(
+                    "group_commit_wal_max_disk_limit config is wrong, please 
check your config!");
+        }
+        // if there are some wal files in wal dir, we need to add it to wal 
disk limit.
+        size_t wal_dir_size = 0;
+#ifndef BE_TEST
+        RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(wal_dir, 
&wal_dir_size));
+#endif
+        if (is_percent) {
+            wal_disk_limit += wal_dir_size;
+        }
+        RETURN_IF_ERROR(_wal_dirs_info->add(wal_dir, wal_disk_limit, 
wal_dir_size, 0));
+
+#ifdef BE_TEST
+        wal_limit_test_bytes = wal_disk_limit;
+#endif
+    }
     return Thread::create(
-            "WalMgr", "replay_wal", [this]() { 
static_cast<void>(this->replay()); },
-            &_replay_thread);
+            "WalMgr", "update_wal_dir_info",
+            [this]() { static_cast<void>(this->_update_wal_dir_info_thread()); 
},
+            &_update_wal_dirs_info_thread);
 }
 
 void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, 
WAL_STATUS wal_status) {
@@ -158,9 +237,8 @@ void WalManager::print_wal_status_queue() {
 }
 
 Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t 
wal_id,
-                                const std::string& label) {
-    std::string base_path =
-            _wal_dirs.size() == 1 ? _wal_dirs[0] : _wal_dirs[rand() % 
_wal_dirs.size()];
+                                const std::string& label, std::string& 
base_path) {
+    base_path = _wal_dirs_info->get_available_random_wal_dir();
     std::stringstream ss;
     ss << base_path << "/" << std::to_string(db_id) << "/" << 
std::to_string(table_id) << "/"
        << std::to_string(wal_id) << "_" << label;
@@ -209,7 +287,7 @@ Status WalManager::create_wal_writer(int64_t wal_id, 
std::shared_ptr<WalWriter>&
         
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(base_path));
     }
     LOG(INFO) << "create wal " << wal_path;
-    wal_writer = std::make_shared<WalWriter>(wal_path, _all_wal_disk_bytes, 
_cv);
+    wal_writer = std::make_shared<WalWriter>(wal_path);
     RETURN_IF_ERROR(wal_writer->init());
     {
         std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
@@ -250,7 +328,7 @@ Status WalManager::scan_wals(const std::string& wal_path) {
                              << ", st=" << st.to_string();
                 return st;
             }
-            if (wals.size() == 0) {
+            if (wals.empty()) {
                 continue;
             }
             std::vector<std::string> res;
@@ -331,6 +409,12 @@ Status WalManager::add_recover_wal(int64_t db_id, int64_t 
table_id, std::vector<
         table_ptr = it->second;
     }
     table_ptr->add_wals(wals);
+#ifndef BE_TEST
+    for (auto wal : wals) {
+        RETURN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal)));
+        RETURN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal)));
+    }
+#endif
     return Status::OK();
 }
 
@@ -344,31 +428,20 @@ size_t WalManager::get_wal_table_size(int64_t table_id) {
     }
 }
 
-Status WalManager::delete_wal(int64_t wal_id) {
+Status WalManager::delete_wal(int64_t wal_id, size_t 
block_queue_pre_allocated) {
+    std::string wal_path;
     {
         std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
-        if (_wal_id_to_writer_map.find(wal_id) != _wal_id_to_writer_map.end()) 
{
-            
_all_wal_disk_bytes->fetch_sub(_wal_id_to_writer_map[wal_id]->disk_bytes(),
-                                           std::memory_order_relaxed);
-            _cv->notify_one();
-            std::string wal_path = _wal_path_map[wal_id];
-            LOG(INFO) << "wal delete file=" << wal_path << ", this file disk 
usage is "
-                      << _wal_id_to_writer_map[wal_id]->disk_bytes()
-                      << " ,after deleting it, all wals disk usage is "
-                      << _all_wal_disk_bytes->load(std::memory_order_relaxed);
-            _wal_id_to_writer_map.erase(wal_id);
-        }
-        if (_wal_id_to_writer_map.empty()) {
-            CHECK_EQ(_all_wal_disk_bytes->load(std::memory_order_relaxed), 0);
-        }
         auto it = _wal_path_map.find(wal_id);
         if (it != _wal_path_map.end()) {
-            std::string wal_path = it->second;
+            wal_path = it->second;
             
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path));
             LOG(INFO) << "delete file=" << wal_path;
             _wal_path_map.erase(wal_id);
         }
     }
+    RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path),
+                                                 block_queue_pre_allocated, 
false));
     return Status::OK();
 }
 
@@ -409,4 +482,46 @@ Status WalManager::get_wal_column_index(int64_t wal_id, 
std::vector<size_t>& col
     return Status::OK();
 }
 
+size_t WalManager::get_max_available_size() {
+    return _wal_dirs_info->get_max_available_size();
+}
+
+Status WalManager::update_wal_dir_limit(const std::string& wal_dir, size_t 
limit) {
+    return _wal_dirs_info->update_wal_dir_limit(wal_dir, limit);
+}
+
+Status WalManager::update_wal_dir_used(const std::string& wal_dir, size_t 
used) {
+    return _wal_dirs_info->update_wal_dir_used(wal_dir, used);
+}
+
+Status WalManager::update_wal_dir_pre_allocated(const std::string& wal_dir, 
size_t pre_allocated,
+                                                bool is_add_pre_allocated) {
+    return _wal_dirs_info->update_wal_dir_pre_allocated(wal_dir, pre_allocated,
+                                                        is_add_pre_allocated);
+}
+
+Status WalManager::_update_wal_dir_info_thread() {
+    while (!_stop.load()) {
+        static_cast<void>(_wal_dirs_info->update_all_wal_dir_limit());
+        static_cast<void>(_wal_dirs_info->update_all_wal_dir_used());
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    }
+    return Status::OK();
+}
+
+Status WalManager::get_wal_dir_available_size(const std::string& wal_dir, 
size_t* available_bytes) {
+    return _wal_dirs_info->get_wal_dir_available_size(wal_dir, 
available_bytes);
+}
+
+std::string WalManager::_get_base_wal_path(const std::string& wal_path_str) {
+    io::Path wal_path = wal_path_str;
+    for (int i = 0; i < 3; ++i) {
+        if (!wal_path.has_parent_path()) {
+            return "";
+        }
+        wal_path = wal_path.parent_path();
+    }
+    return wal_path.string();
+}
+
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h
index 83847beabf0..8d330a64bb9 100644
--- a/be/src/olap/wal_manager.h
+++ b/be/src/olap/wal_manager.h
@@ -16,15 +16,24 @@
 // under the License.
 
 #pragma once
+
 #include <gen_cpp/PaloInternalService_types.h>
 
+#include <atomic>
 #include <condition_variable>
+#include <cstddef>
+#include <cstdint>
 #include <memory>
+#include <shared_mutex>
+#include <thread>
+#include <unordered_map>
 
 #include "common/config.h"
 #include "gen_cpp/FrontendService.h"
 #include "gen_cpp/FrontendService_types.h"
 #include "gen_cpp/HeartbeatService_types.h"
+#include "gutil/ref_counted.h"
+#include "olap/wal_dirs_info.h"
 #include "olap/wal_reader.h"
 #include "olap/wal_table.h"
 #include "olap/wal_writer.h"
@@ -47,7 +56,7 @@ public:
 public:
     WalManager(ExecEnv* exec_env, const std::string& wal_dir);
     ~WalManager();
-    Status delete_wal(int64_t wal_id);
+    Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated = 0);
     Status init();
     Status scan_wals(const std::string& wal_path);
     Status replay();
@@ -56,7 +65,8 @@ public:
     Status scan();
     size_t get_wal_table_size(int64_t table_id);
     Status add_recover_wal(int64_t db_id, int64_t table_id, 
std::vector<std::string> wals);
-    Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const 
std::string& label);
+    Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const 
std::string& label,
+                        std::string& base_path);
     Status get_wal_path(int64_t wal_id, std::string& wal_path);
     Status get_wal_status_queue_size(const PGetWalQueueSizeRequest* request,
                                      PGetWalQueueSizeResponse* response);
@@ -70,10 +80,30 @@ public:
     void erase_wal_column_index(int64_t wal_id);
     Status get_wal_column_index(int64_t wal_id, std::vector<size_t>& 
column_index);
 
+    Status update_wal_dir_limit(const std::string& wal_dir, size_t limit = -1);
+    Status update_wal_dir_used(const std::string& wal_dir, size_t used = -1);
+    Status update_wal_dir_pre_allocated(const std::string& wal_dir, size_t 
pre_allocated,
+                                        bool is_add_pre_allocated);
+    Status get_wal_dir_available_size(const std::string& wal_dir, size_t* 
available_bytes);
+    size_t get_max_available_size();
+
+private:
+    Status _init_wal_dirs_conf();
+    Status _init_wal_dirs();
+    Status _init_wal_dirs_info();
+    std::string _get_base_wal_path(const std::string& wal_path_str);
+    const std::string& _get_available_random_wal_dir();
+    Status _update_wal_dir_info_thread();
+
+public:
+    // used for be ut
+    size_t wal_limit_test_bytes;
+
 private:
     ExecEnv* _exec_env = nullptr;
     std::shared_mutex _lock;
     scoped_refptr<Thread> _replay_thread;
+    scoped_refptr<Thread> _update_wal_dirs_info_thread;
     CountDownLatch _stop_background_threads_latch;
     std::map<int64_t, std::shared_ptr<WalTable>> _table_map;
     std::vector<std::string> _wal_dirs;
@@ -81,12 +111,11 @@ private:
     std::shared_mutex _wal_status_lock;
     std::unordered_map<int64_t, std::string> _wal_path_map;
     std::unordered_map<int64_t, std::shared_ptr<WalWriter>> 
_wal_id_to_writer_map;
-    std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes;
     std::unordered_map<int64_t, std::unordered_map<int64_t, WAL_STATUS>> 
_wal_status_queues;
     std::atomic<bool> _stop;
     std::shared_mutex _wal_column_id_map_lock;
     std::unordered_map<int64_t, std::vector<size_t>&> _wal_column_id_map;
-    std::shared_ptr<std::condition_variable> _cv;
     std::unique_ptr<doris::ThreadPool> _thread_pool;
+    std::unique_ptr<WalDirsInfo> _wal_dirs_info;
 };
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_writer.cpp b/be/src/olap/wal_writer.cpp
index 9d3da90d887..beae6ec80b6 100644
--- a/be/src/olap/wal_writer.cpp
+++ b/be/src/olap/wal_writer.cpp
@@ -26,6 +26,7 @@
 #include "io/fs/local_file_system.h"
 #include "io/fs/path.h"
 #include "olap/storage_engine.h"
+#include "olap/wal_manager.h"
 #include "util/crc32c.h"
 
 namespace doris {
@@ -33,14 +34,7 @@ namespace doris {
 const char* k_wal_magic = "WAL1";
 const uint32_t k_wal_magic_length = 4;
 
-WalWriter::WalWriter(const std::string& file_name,
-                     const std::shared_ptr<std::atomic_size_t>& 
all_wal_disk_bytes,
-                     const std::shared_ptr<std::condition_variable>& cv)
-        : cv(cv),
-          _file_name(file_name),
-          _disk_bytes(0),
-          _all_wal_disk_bytes(all_wal_disk_bytes),
-          _is_first_append_blocks(true) {}
+WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name) {}
 
 WalWriter::~WalWriter() {}
 
@@ -58,22 +52,6 @@ Status WalWriter::finalize() {
 }
 
 Status WalWriter::append_blocks(const PBlockArray& blocks) {
-    {
-        if (_is_first_append_blocks) {
-            _is_first_append_blocks = false;
-            std::unique_lock l(_mutex);
-            while (_all_wal_disk_bytes->load(std::memory_order_relaxed) >
-                   config::wal_max_disk_size) {
-                LOG(INFO) << "First time to append blocks to wal file " << 
_file_name
-                          << ". Currently, all wal disk space usage is "
-                          << 
_all_wal_disk_bytes->load(std::memory_order_relaxed)
-                          << ", larger than the maximum limit " << 
config::wal_max_disk_size
-                          << ", so we need to wait. When any other load 
finished, that wal will be "
-                             "removed, the space used by that wal will be 
free.";
-                cv->wait_for(l, 
std::chrono::milliseconds(WalWriter::MAX_WAL_WRITE_WAIT_TIME));
-            }
-        }
-    }
     size_t total_size = 0;
     for (const auto& block : blocks) {
         total_size += LENGTH_SIZE + block->ByteSizeLong() + CHECKSUM_SIZE;
@@ -99,8 +77,6 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) {
                 "failed to write block to wal expected= " + 
std::to_string(total_size) +
                 ",actually=" + std::to_string(offset));
     }
-    _disk_bytes.fetch_add(total_size, std::memory_order_relaxed);
-    _all_wal_disk_bytes->fetch_add(total_size, std::memory_order_relaxed);
     return Status::OK();
 }
 
diff --git a/be/src/olap/wal_writer.h b/be/src/olap/wal_writer.h
index 88ff4659769..ea8bba4f021 100644
--- a/be/src/olap/wal_writer.h
+++ b/be/src/olap/wal_writer.h
@@ -34,16 +34,13 @@ extern const uint32_t k_wal_magic_length;
 
 class WalWriter {
 public:
-    explicit WalWriter(const std::string& file_name,
-                       const std::shared_ptr<std::atomic_size_t>& 
all_wal_disk_bytes,
-                       const std::shared_ptr<std::condition_variable>& cv);
+    explicit WalWriter(const std::string& file_name);
     ~WalWriter();
 
     Status init();
     Status finalize();
 
     Status append_blocks(const PBlockArray& blocks);
-    size_t disk_bytes() const { return 
_disk_bytes.load(std::memory_order_relaxed); };
     Status append_header(uint32_t version, std::string col_ids);
 
     std::string file_name() { return _file_name; };
@@ -51,17 +48,12 @@ public:
 public:
     static const int64_t LENGTH_SIZE = 8;
     static const int64_t CHECKSUM_SIZE = 4;
-    std::shared_ptr<std::condition_variable> cv;
     static const int64_t VERSION_SIZE = 4;
 
 private:
     static constexpr size_t MAX_WAL_WRITE_WAIT_TIME = 1000;
     std::string _file_name;
     io::FileWriterPtr _file_writer;
-    std::atomic_size_t _disk_bytes;
-    std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes;
-    std::mutex _mutex;
-    bool _is_first_append_blocks;
 };
 
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 4f4325a79c7..4c1060891f5 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -229,6 +229,7 @@ public:
     void set_routine_load_task_executor(RoutineLoadTaskExecutor* r) {
         this->_routine_load_task_executor = r;
     }
+    void set_wal_mgr(std::shared_ptr<WalManager> wm) { this->_wal_manager = 
wm; }
 
 #endif
     stream_load::LoadStreamStubPool* load_stream_stub_pool() {
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 201a290a6e8..28df650f13c 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -21,14 +21,22 @@
 #include <glog/logging.h>
 
 #include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <shared_mutex>
+#include <vector>
 
 #include "client_cache.h"
 #include "common/config.h"
+#include "common/status.h"
 #include "olap/wal_manager.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
 #include "runtime/runtime_state.h"
 #include "util/thrift_rpc_helper.h"
+#include "vec/core/block.h"
 
 namespace doris {
 
@@ -46,7 +54,6 @@ Status 
LoadBlockQueue::add_block(std::shared_ptr<vectorized::Block> block, bool
             RETURN_IF_ERROR(_v_wal_writer->write_wal(block.get()));
         }
         _all_block_queues_bytes->fetch_add(block->bytes(), 
std::memory_order_relaxed);
-        _single_block_queue_bytes->fetch_add(block->bytes(), 
std::memory_order_relaxed);
     }
     _get_cond.notify_all();
     return Status::OK();
@@ -68,7 +75,6 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, 
vectorized::Block*
     }
     while (!runtime_state->is_cancelled() && status.ok() && 
_block_queue.empty() &&
            (!need_commit || (need_commit && !_load_ids.empty()))) {
-        CHECK_EQ(_single_block_queue_bytes->load(), 0);
         auto left_milliseconds = _group_commit_interval_ms;
         auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
                                 std::chrono::steady_clock::now() - _start_time)
@@ -106,10 +112,8 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
         *find_block = true;
         _block_queue.pop_front();
         _all_block_queues_bytes->fetch_sub(block->bytes(), 
std::memory_order_relaxed);
-        _single_block_queue_bytes->fetch_sub(block->bytes(), 
std::memory_order_relaxed);
     }
     if (_block_queue.empty() && need_commit && _load_ids.empty()) {
-        CHECK_EQ(_single_block_queue_bytes->load(), 0);
         *eos = true;
     } else {
         *eos = false;
@@ -150,7 +154,6 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) 
{
         {
             auto& future_block = _block_queue.front();
             _all_block_queues_bytes->fetch_sub(future_block->bytes(), 
std::memory_order_relaxed);
-            _single_block_queue_bytes->fetch_sub(future_block->bytes(), 
std::memory_order_relaxed);
         }
         _block_queue.pop_front();
     }
@@ -333,11 +336,12 @@ Status 
GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
                 10000L);
         result_status = Status::create(result.status);
     }
+    std::shared_ptr<LoadBlockQueue> load_block_queue;
     {
         std::lock_guard<std::mutex> l(_lock);
         auto it = _load_block_queues.find(instance_id);
         if (it != _load_block_queues.end()) {
-            auto& load_block_queue = it->second;
+            load_block_queue = it->second;
             if (!status.ok()) {
                 load_block_queue->cancel(status);
             }
@@ -357,7 +361,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
     // result_status: commit txn result
     if (status.ok() && st.ok() &&
         (result_status.ok() || 
result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) {
-        RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(txn_id));
+        RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(
+                txn_id, load_block_queue->block_queue_pre_allocated.load()));
         RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(table_id, 
txn_id));
     } else {
         std::string wal_path;
@@ -443,8 +448,9 @@ Status GroupCommitMgr::get_first_block_load_queue(int64_t 
db_id, int64_t table_i
         }
         group_commit_table = _table_map[table_id];
     }
-    return group_commit_table->get_first_block_load_queue(table_id, 
base_schema_version, load_id,
-                                                          load_block_queue, 
be_exe_version);
+    RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
+            table_id, base_schema_version, load_id, load_block_queue, 
be_exe_version));
+    return Status::OK();
 }
 
 Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& 
instance_id,
@@ -461,11 +467,14 @@ Status GroupCommitMgr::get_load_block_queue(int64_t 
table_id, const TUniqueId& i
     }
     return group_commit_table->get_load_block_queue(instance_id, 
load_block_queue);
 }
+
 Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id,
                                   const std::string& import_label, WalManager* 
wal_manager,
                                   std::vector<TSlotDescriptor>& slot_desc, int 
be_exe_version) {
+    RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->add_wal_path(db_id, 
tb_id, wal_id,
+                                                                    
import_label, wal_base_path));
     _v_wal_writer = std::make_shared<vectorized::VWalWriter>(
-            db_id, tb_id, wal_id, import_label, wal_manager, slot_desc, 
be_exe_version);
+            tb_id, wal_id, import_label, wal_manager, slot_desc, 
be_exe_version);
     return _v_wal_writer->init();
 }
 
@@ -475,4 +484,67 @@ Status LoadBlockQueue::close_wal() {
     }
     return Status::OK();
 }
+
+bool LoadBlockQueue::has_enough_wal_disk_space(
+        const std::vector<std::shared_ptr<vectorized::Block>>& blocks, const 
TUniqueId& load_id,
+        bool is_blocks_contain_all_load_data) {
+    size_t blocks_size = 0;
+    for (auto block : blocks) {
+        blocks_size += block->bytes();
+    }
+    size_t content_length = 0;
+    Status st = 
ExecEnv::GetInstance()->group_commit_mgr()->get_load_info(load_id, 
&content_length);
+    if (st.ok()) {
+        
RETURN_IF_ERROR(ExecEnv::GetInstance()->group_commit_mgr()->remove_load_info(load_id));
+    } else {
+        return Status::InternalError("can not find load id.");
+    }
+    size_t pre_allocated = is_blocks_contain_all_load_data
+                                   ? blocks_size
+                                   : (blocks_size > content_length ? 
blocks_size : content_length);
+    auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
+    size_t available_bytes = 0;
+    {
+        st = wal_mgr->get_wal_dir_available_size(wal_base_path, 
&available_bytes);
+        if (!st.ok()) {
+            LOG(WARNING) << "get wal disk available size filed!";
+        }
+    }
+    if (pre_allocated < available_bytes) {
+        st = wal_mgr->update_wal_dir_pre_allocated(wal_base_path, 
pre_allocated, true);
+        if (!st.ok()) {
+            LOG(WARNING) << "update wal dir pre_allocated failed, reason: " << 
st.to_string();
+        }
+        block_queue_pre_allocated.fetch_add(pre_allocated);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+Status GroupCommitMgr::update_load_info(TUniqueId load_id, size_t 
content_length) {
+    std::unique_lock l(_load_info_lock);
+    if (_load_id_to_content_length_map.find(load_id) == 
_load_id_to_content_length_map.end()) {
+        _load_id_to_content_length_map.insert(std::make_pair(load_id, 
content_length));
+    }
+    return Status::OK();
+}
+
+Status GroupCommitMgr::get_load_info(TUniqueId load_id, size_t* 
content_length) {
+    std::shared_lock l(_load_info_lock);
+    if (_load_id_to_content_length_map.find(load_id) != 
_load_id_to_content_length_map.end()) {
+        *content_length = _load_id_to_content_length_map[load_id];
+        return Status::OK();
+    }
+    return Status::InternalError("can not find load id!");
+}
+
+Status GroupCommitMgr::remove_load_info(TUniqueId load_id) {
+    std::unique_lock l(_load_info_lock);
+    if (_load_id_to_content_length_map.find(load_id) == 
_load_id_to_content_length_map.end()) {
+        return Status::InternalError("can not remove load id!");
+    }
+    _load_id_to_content_length_map.erase(load_id);
+    return Status::OK();
+}
 } // namespace doris
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index 6f625eda6ae..35afcc46249 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -20,9 +20,17 @@
 #include <gen_cpp/PaloInternalService_types.h>
 
 #include <atomic>
+#include <condition_variable>
+#include <cstdint>
 #include <memory>
+#include <mutex>
+#include <shared_mutex>
+#include <unordered_map>
+#include <utility>
 
 #include "common/status.h"
+#include "olap/wal_manager.h"
+#include "runtime/exec_env.h"
 #include "util/threadpool.h"
 #include "vec/core/block.h"
 #include "vec/sink/writer/vwal_writer.h"
@@ -45,9 +53,7 @@ public:
               
wait_internal_group_commit_finish(wait_internal_group_commit_finish),
               _start_time(std::chrono::steady_clock::now()),
               _all_block_queues_bytes(all_block_queues_bytes),
-              _group_commit_interval_ms(group_commit_interval_ms) {
-        _single_block_queue_bytes = std::make_shared<std::atomic_size_t>(0);
-    };
+              _group_commit_interval_ms(group_commit_interval_ms) {};
 
     Status add_block(std::shared_ptr<vectorized::Block> block, bool write_wal);
     Status get_block(RuntimeState* runtime_state, vectorized::Block* block, 
bool* find_block,
@@ -59,6 +65,8 @@ public:
                       WalManager* wal_manager, std::vector<TSlotDescriptor>& 
slot_desc,
                       int be_exe_version);
     Status close_wal();
+    bool has_enough_wal_disk_space(const 
std::vector<std::shared_ptr<vectorized::Block>>& blocks,
+                                   const TUniqueId& load_id, bool 
is_blocks_contain_all_load_data);
 
     static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000;
     UniqueId load_instance_id;
@@ -71,6 +79,8 @@ public:
     bool process_finish = false;
     std::condition_variable internal_group_commit_finish_cv;
     Status status = Status::OK();
+    std::string wal_base_path;
+    std::atomic_size_t block_queue_pre_allocated = 0;
 
 private:
     void _cancel_without_lock(const Status& st);
@@ -84,8 +94,6 @@ private:
 
     // memory consumption of all tables' load block queues, used for back 
pressure.
     std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
-    // memory consumption of one load block queue, used for correctness check.
-    std::shared_ptr<std::atomic_size_t> _single_block_queue_bytes;
     // group commit interval in ms, can be changed by 'ALTER TABLE my_table 
SET ("group_commit_interval_ms"="1000");'
     int64_t _group_commit_interval_ms;
     std::shared_ptr<vectorized::VWalWriter> _v_wal_writer;
@@ -145,6 +153,10 @@ public:
                                       const UniqueId& load_id,
                                       std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
                                       int be_exe_version);
+    Status update_load_info(TUniqueId load_id, size_t content_length);
+    Status get_load_info(TUniqueId load_id, size_t* content_length);
+    Status remove_load_info(TUniqueId load_id);
+    std::condition_variable cv;
 
 private:
     ExecEnv* _exec_env = nullptr;
@@ -155,6 +167,8 @@ private:
     std::unique_ptr<doris::ThreadPool> _thread_pool;
     // memory consumption of all tables' load block queues, used for back 
pressure.
     std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
+    std::shared_mutex _load_info_lock;
+    std::unordered_map<TUniqueId, size_t> _load_id_to_content_length_map;
 };
 
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp 
b/be/src/vec/sink/group_commit_block_sink.cpp
index d4ede45868f..3e38f9c42fc 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -17,6 +17,14 @@
 
 #include "vec/sink/group_commit_block_sink.h"
 
+#include <gen_cpp/DataSinks_types.h>
+
+#include <chrono>
+#include <mutex>
+#include <shared_mutex>
+
+#include "common/exception.h"
+#include "runtime/exec_env.h"
 #include "runtime/group_commit_mgr.h"
 #include "runtime/runtime_state.h"
 #include "util/doris_metrics.h"
@@ -61,6 +69,8 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) {
 
 Status GroupCommitBlockSink::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(DataSink::prepare(state));
+    RETURN_IF_ERROR(
+            
ExecEnv::GetInstance()->group_commit_mgr()->update_load_info(_load_id.to_thrift(),
 0));
     _state = state;
 
     // profile must add to state's object pool
@@ -107,7 +117,7 @@ Status GroupCommitBlockSink::close(RuntimeState* state, 
Status close_status) {
             (double)state->num_rows_load_filtered() / num_selected_rows > 
_max_filter_ratio) {
             return Status::DataQualityError("too many filtered rows");
         }
-        RETURN_IF_ERROR(_add_blocks());
+        RETURN_IF_ERROR(_add_blocks(_group_commit_mode != 
TGroupCommitMode::SYNC_MODE, true));
     }
     if (_load_block_queue) {
         _load_block_queue->remove_load_id(_load_id);
@@ -213,7 +223,7 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state,
         _blocks.emplace_back(output_block);
     } else {
         if (!_is_block_appended) {
-            RETURN_IF_ERROR(_add_blocks());
+            RETURN_IF_ERROR(_add_blocks(_group_commit_mode != 
TGroupCommitMode::SYNC_MODE, false));
         }
         RETURN_IF_ERROR(_load_block_queue->add_block(
                 output_block, _group_commit_mode != 
TGroupCommitMode::SYNC_MODE));
@@ -221,7 +231,7 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state,
     return Status::OK();
 }
 
-Status GroupCommitBlockSink::_add_blocks() {
+Status GroupCommitBlockSink::_add_blocks(bool write_wal, bool 
is_blocks_contain_all_load_data) {
     DCHECK(_is_block_appended == false);
     TUniqueId load_id;
     load_id.__set_hi(_load_id.hi);
@@ -231,6 +241,20 @@ Status GroupCommitBlockSink::_add_blocks() {
             
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
                     _db_id, _table_id, _base_schema_version, load_id, 
_load_block_queue,
                     _state->be_exec_version()));
+            if (write_wal) {
+                _group_commit_mode = 
_load_block_queue->has_enough_wal_disk_space(
+                                             _blocks, load_id, 
is_blocks_contain_all_load_data)
+                                             ? TGroupCommitMode::ASYNC_MODE
+                                             : TGroupCommitMode::SYNC_MODE;
+                if (_group_commit_mode == TGroupCommitMode::SYNC_MODE) {
+                    LOG(INFO)
+                            << "Load label " << _load_block_queue->label
+                            << " will not write wal because wal disk space 
usage reachs max limit.";
+                } else {
+                    LOG(INFO) << "Load label " << _load_block_queue->label << 
" will write wal to "
+                              << _load_block_queue->wal_base_path << ".";
+                }
+            }
             _state->set_import_label(_load_block_queue->label);
             _state->set_wal_id(_load_block_queue->txn_id);
         } else {
diff --git a/be/src/vec/sink/group_commit_block_sink.h 
b/be/src/vec/sink/group_commit_block_sink.h
index d7c8ed70d6d..c0971f4801a 100644
--- a/be/src/vec/sink/group_commit_block_sink.h
+++ b/be/src/vec/sink/group_commit_block_sink.h
@@ -47,7 +47,7 @@ public:
 
 private:
     Status _add_block(RuntimeState* state, std::shared_ptr<vectorized::Block> 
block);
-    Status _add_blocks();
+    Status _add_blocks(bool write_wal, bool is_blocks_contain_all_load_data);
 
     vectorized::VExprContextSPtrs _output_vexpr_ctxs;
 
diff --git a/be/src/vec/sink/writer/vwal_writer.cpp 
b/be/src/vec/sink/writer/vwal_writer.cpp
index d929207e9a9..2dc945a2a2f 100644
--- a/be/src/vec/sink/writer/vwal_writer.cpp
+++ b/be/src/vec/sink/writer/vwal_writer.cpp
@@ -42,11 +42,10 @@
 namespace doris {
 namespace vectorized {
 
-VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id,
-                       const std::string& import_label, WalManager* 
wal_manager,
-                       std::vector<TSlotDescriptor>& slot_desc, int 
be_exe_version)
-        : _db_id(db_id),
-          _tb_id(tb_id),
+VWalWriter::VWalWriter(int64_t tb_id, int64_t wal_id, const std::string& 
import_label,
+                       WalManager* wal_manager, std::vector<TSlotDescriptor>& 
slot_desc,
+                       int be_exe_version)
+        : _tb_id(tb_id),
           _wal_id(wal_id),
           _label(import_label),
           _wal_manager(wal_manager),
@@ -56,7 +55,6 @@ VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t 
wal_id,
 VWalWriter::~VWalWriter() {}
 
 Status VWalWriter::init() {
-    RETURN_IF_ERROR(_wal_manager->add_wal_path(_db_id, _tb_id, _wal_id, 
_label));
     RETURN_IF_ERROR(_wal_manager->create_wal_writer(_wal_id, _wal_writer));
     _wal_manager->add_wal_status_queue(_tb_id, _wal_id, 
WalManager::WAL_STATUS::CREATE);
     std::stringstream ss;
diff --git a/be/src/vec/sink/writer/vwal_writer.h 
b/be/src/vec/sink/writer/vwal_writer.h
index 17c9dc979a1..324409e9d46 100644
--- a/be/src/vec/sink/writer/vwal_writer.h
+++ b/be/src/vec/sink/writer/vwal_writer.h
@@ -83,7 +83,7 @@ namespace vectorized {
 
 class VWalWriter {
 public:
-    VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, const 
std::string& import_label,
+    VWalWriter(int64_t tb_id, int64_t wal_id, const std::string& import_label,
                WalManager* wal_manager, std::vector<TSlotDescriptor>& 
slot_desc,
                int be_exe_version);
     ~VWalWriter();
@@ -92,7 +92,6 @@ public:
     Status close();
 
 private:
-    int64_t _db_id;
     int64_t _tb_id;
     int64_t _wal_id;
     uint32_t _version = 0;
diff --git a/be/test/http/stream_load_test.cpp 
b/be/test/http/stream_load_test.cpp
index 75588f4190b..74dcf9f1f3f 100644
--- a/be/test/http/stream_load_test.cpp
+++ b/be/test/http/stream_load_test.cpp
@@ -19,6 +19,8 @@
 
 #include <gtest/gtest.h>
 
+#include <memory>
+
 #include "common/config.h"
 #include "event2/buffer.h"
 #include "event2/event.h"
@@ -34,6 +36,7 @@
 #include "http/http_request.h"
 #include "http/utils.h"
 #include "olap/wal_manager.h"
+#include "runtime/exec_env.h"
 
 namespace doris {
 
@@ -51,10 +54,14 @@ void http_request_done_cb(struct evhttp_request* req, void* 
arg) {
 
 TEST_F(StreamLoadTest, TestHeader) {
     // 1G
-    config::wal_max_disk_size = 1073741824;
+    auto wal_mgr = WalManager::create_shared(ExecEnv::GetInstance(), 
config::group_commit_wal_path);
+    static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_1", 1000, 0, 0));
+    static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_2", 10000, 0, 
0));
+    static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_3", 100000, 0, 
0));
+    ExecEnv::GetInstance()->set_wal_mgr(wal_mgr);
     // 1. empty info
     {
-        auto evhttp_req = evhttp_request_new(nullptr, nullptr);
+        auto* evhttp_req = evhttp_request_new(nullptr, nullptr);
         HttpRequest req(evhttp_req);
         EXPECT_EQ(load_size_smaller_than_wal_limit(&req), false);
         evhttp_request_free(evhttp_req);
@@ -86,7 +93,7 @@ TEST_F(StreamLoadTest, TestHeader) {
         evhttp_req->uri = uri;
         evhttp_req->uri_elems = evhttp_uri_parse(uri);
         evhttp_add_header(evhttp_req->input_headers, 
HTTP_GROUP_COMMIT.c_str(), "true");
-        evhttp_add_header(evhttp_req->input_headers, 
HttpHeaders::CONTENT_LENGTH, "1000");
+        evhttp_add_header(evhttp_req->input_headers, 
HttpHeaders::CONTENT_LENGTH, "20000");
         HttpRequest req(evhttp_req);
         req.init_from_evhttp();
         EXPECT_EQ(load_size_smaller_than_wal_limit(&req), true);
diff --git a/be/test/olap/wal_manager_test.cpp 
b/be/test/olap/wal_manager_test.cpp
index 17fb3e3e6f4..93d8636eb22 100644
--- a/be/test/olap/wal_manager_test.cpp
+++ b/be/test/olap/wal_manager_test.cpp
@@ -18,6 +18,7 @@
 
 #include <gtest/gtest.h>
 
+#include <cstddef>
 #include <filesystem>
 #include <map>
 #include <string>
@@ -27,6 +28,7 @@
 #include "gen_cpp/HeartbeatService_types.h"
 #include "gen_cpp/internal_service.pb.h"
 #include "io/fs/local_file_system.h"
+#include "olap/options.h"
 #include "runtime/decimalv2_value.h"
 #include "runtime/exec_env.h"
 #include "runtime/result_queue_mgr.h"
@@ -46,8 +48,7 @@ extern Status k_stream_load_plan_status;
 extern std::string k_request_line;
 
 ExecEnv* _env = nullptr;
-std::string wal_dir = "./wal_test";
-std::string tmp_dir = "./wal_test/tmp";
+std::string wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
 
 class WalManagerTest : public testing::Test {
 public:
@@ -63,6 +64,7 @@ public:
         _env->_internal_client_cache = new 
BrpcClientCache<PBackendService_Stub>();
         _env->_function_client_cache = new 
BrpcClientCache<PFunctionService_Stub>();
         _env->_stream_load_executor = StreamLoadExecutor::create_shared(_env);
+        _env->_store_paths = {StorePath(std::filesystem::current_path(), 0)};
         _env->_wal_manager = WalManager::create_shared(_env, wal_dir);
         k_stream_load_begin_result = TLoadTxnBeginResult();
         k_stream_load_plan_status = Status::OK();
@@ -78,16 +80,14 @@ public:
     void prepare() { 
static_cast<void>(io::global_local_filesystem()->create_directory(wal_dir)); }
 
     void createWal(const std::string& wal_path) {
-        std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes =
-                std::make_shared<std::atomic_size_t>(0);
-        std::shared_ptr<std::condition_variable> cv = 
std::make_shared<std::condition_variable>();
-        auto wal_writer = WalWriter(wal_path, _all_wal_disk_bytes, cv);
+        auto wal_writer = WalWriter(wal_path);
         static_cast<void>(wal_writer.init());
         static_cast<void>(wal_writer.finalize());
     }
 };
 
 TEST_F(WalManagerTest, recovery_normal) {
+    _env->wal_mgr()->wal_limit_test_bytes = 1099511627776;
     k_request_line = "{\"Status\": \"Success\",    \"Message\": \"Test\"}";
 
     std::string db_id = "1";
@@ -123,4 +123,93 @@ TEST_F(WalManagerTest, recovery_normal) {
     ASSERT_TRUE(!std::filesystem::exists(wal_200));
     ASSERT_TRUE(!std::filesystem::exists(wal_201));
 }
+
+TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) {
+    auto wal_mgr = WalManager::create_shared(_env, 
config::group_commit_wal_path);
+    static_cast<void>(wal_mgr->init());
+    _env->set_wal_mgr(wal_mgr);
+
+    // 1T
+    size_t available_bytes = 1099511627776;
+    size_t wal_limit_bytes;
+
+    _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+    config::group_commit_wal_max_disk_limit = "0%";
+    EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+    EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 0);
+
+    _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+    config::group_commit_wal_max_disk_limit = "5%";
+    EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+    wal_limit_bytes = available_bytes * 0.05;
+    EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, wal_limit_bytes);
+
+    _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+    config::group_commit_wal_max_disk_limit = "50%";
+    EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+    wal_limit_bytes = available_bytes * 0.5;
+    EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, wal_limit_bytes);
+
+    _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+    config::group_commit_wal_max_disk_limit = "200%";
+    EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+    wal_limit_bytes = available_bytes * 2;
+    EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, wal_limit_bytes);
+
+    _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+    config::group_commit_wal_max_disk_limit = "-10%";
+    EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), 
Status::InternalError(""));
+    EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, available_bytes);
+
+    _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+    config::group_commit_wal_max_disk_limit = "0";
+    EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+    EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 0);
+
+    // 1M
+    _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+    config::group_commit_wal_max_disk_limit = "1048576";
+    EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+    EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 1048576);
+
+    // 1G
+    _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+    config::group_commit_wal_max_disk_limit = "1073741824";
+    EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+    EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 1073741824);
+
+    // 100G
+    _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+    config::group_commit_wal_max_disk_limit = "107374182400";
+    EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+    EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 107374182400);
+
+    // 1M
+    _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+    config::group_commit_wal_max_disk_limit = "1M";
+    EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+    EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 1048576);
+
+    // 1G
+    _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+    config::group_commit_wal_max_disk_limit = "1G";
+    EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+    EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 1073741824);
+
+    // 100G
+    _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+    config::group_commit_wal_max_disk_limit = "100G";
+    EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), Status::OK());
+    EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, 107374182400);
+
+    _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+    config::group_commit_wal_max_disk_limit = "-1024";
+    EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), 
Status::InternalError(""));
+    EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, available_bytes);
+
+    _env->wal_mgr()->wal_limit_test_bytes = available_bytes;
+    config::group_commit_wal_max_disk_limit = "-1M";
+    EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(), 
Status::InternalError(""));
+    EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, available_bytes);
+}
 } // namespace doris
\ No newline at end of file
diff --git a/be/test/olap/wal_reader_writer_test.cpp 
b/be/test/olap/wal_reader_writer_test.cpp
index 1d1102f350f..d24db728680 100644
--- a/be/test/olap/wal_reader_writer_test.cpp
+++ b/be/test/olap/wal_reader_writer_test.cpp
@@ -90,10 +90,7 @@ void generate_block(PBlock& pblock, int row_index) {
 
 TEST_F(WalReaderWriterTest, TestWriteAndRead1) {
     std::string file_name = _s_test_data_path + "/abcd123.txt";
-    std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes =
-            std::make_shared<std::atomic_size_t>(0);
-    std::shared_ptr<std::condition_variable> cv = 
std::make_shared<std::condition_variable>();
-    auto wal_writer = WalWriter(file_name, _all_wal_disk_bytes, cv);
+    auto wal_writer = WalWriter(file_name);
     static_cast<void>(wal_writer.init());
     size_t file_len = 0;
     int64_t file_size = -1;
diff --git a/be/test/vec/exec/vtablet_sink_test.cpp 
b/be/test/vec/exec/vtablet_sink_test.cpp
index 81c2bdc4b07..f0b8e8c9643 100644
--- a/be/test/vec/exec/vtablet_sink_test.cpp
+++ b/be/test/vec/exec/vtablet_sink_test.cpp
@@ -496,7 +496,7 @@ public:
 private:
     ExecEnv* _env = nullptr;
     brpc::Server* _server = nullptr;
-    std::string wal_dir = "./wal_test";
+    std::string wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
 };
 
 TEST_F(VOlapTableSinkTest, normal) {
diff --git a/be/test/vec/exec/vwal_scanner_test.cpp 
b/be/test/vec/exec/vwal_scanner_test.cpp
index a239741a5cb..ac066aceb98 100644
--- a/be/test/vec/exec/vwal_scanner_test.cpp
+++ b/be/test/vec/exec/vwal_scanner_test.cpp
@@ -57,7 +57,7 @@ private:
     void init_desc_table();
 
     ExecEnv* _env = nullptr;
-    std::string wal_dir = "./wal_test";
+    std::string wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
     int64_t db_id = 1;
     int64_t tb_id = 2;
     int64_t txn_id = 789;
@@ -195,6 +195,7 @@ void VWalScannerTest::init_desc_table() {
 }
 
 void VWalScannerTest::init() {
+    config::group_commit_wal_max_disk_limit = "100M";
     init_desc_table();
     static_cast<void>(io::global_local_filesystem()->create_directory(
             wal_dir + "/" + std::to_string(db_id) + "/" + 
std::to_string(tb_id)));
@@ -215,7 +216,9 @@ void VWalScannerTest::init() {
 
     _env = ExecEnv::GetInstance();
     _env->_wal_manager = WalManager::create_shared(_env, wal_dir);
-    auto st = _env->_wal_manager->add_wal_path(db_id, tb_id, txn_id, label);
+    std::string base_path;
+    auto st = _env->_wal_manager->_init_wal_dirs_info();
+    st = _env->_wal_manager->add_wal_path(db_id, tb_id, txn_id, label, 
base_path);
 }
 
 TEST_F(VWalScannerTest, normal) {
diff --git 
a/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy
 
b/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy
index f5cb97fbae8..9a081aa2d59 100644
--- 
a/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy
@@ -70,7 +70,7 @@ suite("test_group_commit_and_wal_back_pressure", "p2") {
                 unset 'label'
 
                 file 'test_group_commit_and_wal_back_pressure.csv.gz'
-                time 100000 
+                time 600000 
             }
         })
     }
@@ -88,7 +88,7 @@ suite("test_group_commit_and_wal_back_pressure", "p2") {
                 unset 'label'
 
                 file 'test_group_commit_and_wal_back_pressure.csv.gz'
-                time 100000 
+                time 600000 
             }
         })
     }
@@ -106,7 +106,7 @@ suite("test_group_commit_and_wal_back_pressure", "p2") {
                 unset 'label'
 
                 file 'test_group_commit_and_wal_back_pressure.csv.gz'
-                time 100000 
+                time 600000 
             }
         })
     }
diff --git 
a/regression-test/suites/load_p0/stream_load/test_group_commit_wal_limit.groovy 
b/regression-test/suites/load_p0/stream_load/test_group_commit_wal_limit.groovy
index 24b66de04b8..7361faf72df 100644
--- 
a/regression-test/suites/load_p0/stream_load/test_group_commit_wal_limit.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/test_group_commit_wal_limit.groovy
@@ -41,7 +41,7 @@ suite("test_group_commit_wal_limit") {
     // normal case
     StringBuilder strBuilder = new StringBuilder()
     strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser 
+ ":" + context.config.jdbcPassword)
-    strBuilder.append(" -H \"group_commit:true\" -H \"column_separator:,\" " )
+    strBuilder.append(" -H \"group_commit:async_mode\" -H 
\"column_separator:,\" " )
     strBuilder.append(" -H \"compress_type:gz\" -H \"format:csv\" " )
     strBuilder.append(" -T " + context.config.dataPath + 
"/load_p0/stream_load/test_group_commit_wal_limit.csv.gz")
     strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/api/${db}/${tableName}/_stream_load")
@@ -55,49 +55,13 @@ suite("test_group_commit_wal_limit") {
     logger.info("out is " + out )
     assertTrue(out.contains('group_commit'))
 
-    // chunked data case
-    strBuilder = new StringBuilder()
-    strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser 
+ ":" + context.config.jdbcPassword)
-    strBuilder.append(" -H \"group_commit:true\" -H \"column_separator:,\" " )
-    strBuilder.append(" -H \"compress_type:gz\" -H \"format:csv\" " )
-    strBuilder.append(" -H \"Content-Length:0\" " )
-    strBuilder.append(" -T " + context.config.dataPath + 
"/load_p0/stream_load/test_group_commit_wal_limit.csv.gz")
-    strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/api/${db}/${tableName}/_stream_load")
-
-    command = strBuilder.toString()
-    logger.info("command is " + command)
-    process = ['bash','-c',command].execute() 
-    code = process.waitFor()
-    assertEquals(code, 0)
-    out = process.text
-    logger.info("out is " + out )
-    assertTrue(out.contains('Stream load size too large'))
-
-    // too lagre data case 1TB
-    strBuilder = new StringBuilder()
-    strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser 
+ ":" + context.config.jdbcPassword)
-    strBuilder.append(" -H \"group_commit:true\" -H \"column_separator:,\" " )
-    strBuilder.append(" -H \"compress_type:gz\" -H \"format:csv\" " )
-    strBuilder.append(" -H \"Content-Length:1099511627776\" " )
-    strBuilder.append(" -T " + context.config.dataPath + 
"/load_p0/stream_load/test_group_commit_wal_limit.csv.gz")
-    strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/api/${db}/${tableName}/_stream_load")
-
-    command = strBuilder.toString()
-    logger.info("command is " + command)
-    process = ['bash','-c',command].execute() 
-    code = process.waitFor()
-    assertEquals(code, 0)
-    out = process.text
-    logger.info("out is " + out )
-    assertTrue(out.contains('Stream load size too large'))
-
     // httpload 
     // normal case
     strBuilder = new StringBuilder()
     strBuilder.append("curl -v --location-trusted -u " + 
context.config.jdbcUser + ":" + context.config.jdbcPassword)
     String sql = " -H \"sql:insert into " + db + "." + tableName + " (k,v) 
select c1, c2 from http_stream(\\\"format\\\" = \\\"csv\\\", 
\\\"column_separator\\\" = \\\",\\\", \\\"compress_type\\\" = \\\"gz\\\" ) \" "
     strBuilder.append(sql)
-    strBuilder.append(" -H \"group_commit:true\"") 
+    strBuilder.append(" -H \"group_commit:async_mode\"") 
     strBuilder.append(" -T " + context.config.dataPath + 
"/load_p0/stream_load/test_group_commit_wal_limit.csv.gz")
     strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/api/_http_stream")
 
@@ -109,40 +73,4 @@ suite("test_group_commit_wal_limit") {
     out = process.text
     logger.info("out is " + out )
     assertTrue(out.contains('group_commit'))
-
-    // chunked data case
-    strBuilder = new StringBuilder()
-    strBuilder.append("curl -v --location-trusted -u " + 
context.config.jdbcUser + ":" + context.config.jdbcPassword)
-    sql = " -H \"sql:insert into " + db + "." + tableName + " (k,v) select c1, 
c2 from http_stream(\\\"format\\\" = \\\"csv\\\", \\\"column_separator\\\" = 
\\\",\\\", \\\"compress_type\\\" = \\\"gz\\\" ) \" "
-    strBuilder.append(sql)
-    strBuilder.append(" -H \"group_commit:true\" -H \"Content-Length:0\"") 
-    strBuilder.append(" -T " + context.config.dataPath + 
"/load_p0/stream_load/test_group_commit_wal_limit.csv.gz")
-    strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/api/_http_stream")
-
-    command = strBuilder.toString()
-    logger.info("command is " + command)
-    process = ['bash','-c',command].execute() 
-    code = process.waitFor()
-    assertEquals(code, 0)
-    out = process.text
-    logger.info("out is " + out )
-    assertTrue(out.contains('Http load size too large'))
-
-    // too lagre data case 1TB
-    strBuilder = new StringBuilder()
-    strBuilder.append("curl -v --location-trusted -u " + 
context.config.jdbcUser + ":" + context.config.jdbcPassword)
-    sql = " -H \"sql:insert into " + db + "." + tableName + " (k,v) select c1, 
c2 from http_stream(\\\"format\\\" = \\\"csv\\\", \\\"column_separator\\\" = 
\\\",\\\", \\\"compress_type\\\" = \\\"gz\\\" ) \" "
-    strBuilder.append(sql)
-    strBuilder.append(" -H \"group_commit:true\" -H 
\"Content-Length:1099511627776\"") 
-    strBuilder.append(" -T " + context.config.dataPath + 
"/load_p0/stream_load/test_group_commit_wal_limit.csv.gz")
-    strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/api/_http_stream")
-
-    command = strBuilder.toString()
-    logger.info("command is " + command)
-    process = ['bash','-c',command].execute() 
-    code = process.waitFor()
-    assertEquals(code, 0)
-    out = process.text
-    logger.info("out is " + out )
-    assertTrue(out.contains('Http load size too large'))
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to