This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-4.0-preview
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 7a030a2e5ef5cbd233f0b2d5203adb333688cdb5
Author: TengJianPing <[email protected]>
AuthorDate: Tue Apr 16 15:46:13 2024 +0800

    [improvement](spill) improve config and fix spill bugs (#33519)
---
 be/src/common/config.cpp                           |   6 +-
 be/src/common/config.h                             |  11 +-
 be/src/olap/storage_engine.h                       |   2 +-
 .../exec/partitioned_aggregation_sink_operator.cpp |   9 +-
 .../partitioned_aggregation_source_operator.cpp    |  13 +-
 .../exec/partitioned_hash_join_probe_operator.cpp  |  24 ++--
 .../exec/partitioned_hash_join_sink_operator.cpp   |   6 +-
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  21 ++--
 .../pipeline/exec/spill_sort_source_operator.cpp   |  13 +-
 be/src/runtime/exec_env_init.cpp                   |   8 +-
 .../workload_group/workload_group_manager.cpp      |  10 +-
 be/src/vec/spill/spill_stream_manager.cpp          | 134 +++++++++++++++------
 be/src/vec/spill/spill_stream_manager.h            |  53 ++++----
 be/src/vec/spill/spill_writer.cpp                  |  14 +--
 .../java/org/apache/doris/qe/SessionVariable.java  |  42 ++++++-
 15 files changed, 247 insertions(+), 119 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f29ec307914..8e856977888 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1160,13 +1160,13 @@ DEFINE_mDouble(high_disk_avail_level_diff_usages, 
"0.15");
 DEFINE_Int32(partition_disk_index_lru_size, "10000");
 // limit the storage space that query spill files can use
 DEFINE_String(spill_storage_root_path, "${DORIS_HOME}/storage");
-DEFINE_mInt64(spill_storage_limit, "10737418240"); // 10G
-DEFINE_mInt32(spill_gc_interval_ms, "2000");       // 2s
+DEFINE_String(spill_storage_limit, "20%");   // 20%
+DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
+DEFINE_mInt32(spill_gc_file_count, "2000");
 DEFINE_Int32(spill_io_thread_pool_per_disk_thread_num, "2");
 DEFINE_Int32(spill_io_thread_pool_queue_size, "1024");
 DEFINE_Int32(spill_async_task_thread_pool_thread_num, "2");
 DEFINE_Int32(spill_async_task_thread_pool_queue_size, "1024");
-DEFINE_mInt32(spill_mem_warning_water_mark_multiplier, "2");
 
 DEFINE_mBool(check_segment_when_build_rowset_meta, "false");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 81fabfb9517..191ebcc4f3b 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1242,13 +1242,20 @@ DECLARE_mDouble(high_disk_avail_level_diff_usages);
 // create tablet in partition random robin idx lru size, default 10000
 DECLARE_Int32(partition_disk_index_lru_size);
 DECLARE_String(spill_storage_root_path);
-DECLARE_mInt64(spill_storage_limit);
+// Spill storage limit specified as number of bytes
+// ('<int>[bB]?'), megabytes ('<float>[mM]'), gigabytes ('<float>[gG]'),
+// or percentage of capaity ('<int>%').
+// Defaults to bytes if no unit is given.
+// Must larger than 0.
+// If specified as percentage, the final limit value is:
+//   disk_capacity_bytes * storage_flood_stage_usage_percent * 
spill_storage_limit
+DECLARE_String(spill_storage_limit);
 DECLARE_mInt32(spill_gc_interval_ms);
+DECLARE_mInt32(spill_gc_file_count);
 DECLARE_Int32(spill_io_thread_pool_per_disk_thread_num);
 DECLARE_Int32(spill_io_thread_pool_queue_size);
 DECLARE_Int32(spill_async_task_thread_pool_thread_num);
 DECLARE_Int32(spill_async_task_thread_pool_queue_size);
-DECLARE_mInt32(spill_mem_warning_water_mark_multiplier);
 
 DECLARE_mBool(check_segment_when_build_rowset_meta);
 
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 6888af9ee17..362f899c81a 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -170,7 +170,7 @@ public:
     // get all info of root_path
     Status get_all_data_dir_info(std::vector<DataDirInfo>* data_dir_infos, 
bool need_update);
 
-    int64_t get_file_or_directory_size(const std::string& file_path);
+    static int64_t get_file_or_directory_size(const std::string& file_path);
 
     // get root path for creating tablet. The returned vector of root path 
should be round robin,
     // for avoiding that all the tablet would be deployed one disk.
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 4ea531bade0..55a0650dc1f 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -249,14 +249,15 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
 
     auto execution_context = state->get_task_execution_context();
     _shared_state_holder = _shared_state->shared_from_this();
+    auto query_id = state->query_id();
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
     status = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
-            [this, &parent, state, execution_context, submit_timer] {
+            [this, &parent, state, query_id, execution_context, submit_timer] {
                 auto execution_context_lock = execution_context.lock();
                 if (!execution_context_lock) {
-                    LOG(INFO) << "query " << print_id(state->query_id())
+                    LOG(INFO) << "query " << print_id(query_id)
                               << " execution_context released, maybe query was 
cancelled.";
                     return Status::Cancelled("Cancelled");
                 }
@@ -267,13 +268,13 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
                     if (!_shared_state->sink_status.ok() || 
state->is_cancelled()) {
                         if (!_shared_state->sink_status.ok()) {
                             LOG(WARNING)
-                                    << "query " << print_id(state->query_id()) 
<< " agg node "
+                                    << "query " << print_id(query_id) << " agg 
node "
                                     << Base::_parent->id()
                                     << " revoke_memory error: " << 
Base::_shared_state->sink_status;
                         }
                         _shared_state->close();
                     } else {
-                        VLOG_DEBUG << "query " << print_id(state->query_id()) 
<< " agg node "
+                        VLOG_DEBUG << "query " << print_id(query_id) << " agg 
node "
                                    << Base::_parent->id() << " revoke_memory 
finish"
                                    << ", eos: " << _eos;
                     }
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index a2484cd6db4..a5753ef7654 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -205,16 +205,17 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
 
     auto execution_context = state->get_task_execution_context();
     _shared_state_holder = _shared_state->shared_from_this();
+    auto query_id = state->query_id();
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
 
     RETURN_IF_ERROR(
             
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
-                    [this, state, execution_context, submit_timer] {
+                    [this, state, query_id, execution_context, submit_timer] {
                         auto execution_context_lock = execution_context.lock();
                         if (!execution_context_lock) {
-                            LOG(INFO) << "query " << 
print_id(state->query_id())
+                            LOG(INFO) << "query " << print_id(query_id)
                                       << " execution_context released, maybe 
query was cancelled.";
                             // FIXME: return status is meaningless?
                             return Status::Cancelled("Cancelled");
@@ -225,14 +226,14 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
                         Defer defer {[&]() {
                             if (!_status.ok() || state->is_cancelled()) {
                                 if (!_status.ok()) {
-                                    LOG(WARNING) << "query " << 
print_id(state->query_id())
-                                                 << " agg node " << 
_parent->node_id()
+                                    LOG(WARNING) << "query " << 
print_id(query_id) << " agg node "
+                                                 << _parent->node_id()
                                                  << " merge spilled agg data 
error: " << _status;
                                 }
                                 _shared_state->close();
                             } else if 
(_shared_state->spill_partitions.empty()) {
-                                VLOG_DEBUG << "query " << 
print_id(state->query_id())
-                                           << " agg node " << 
_parent->node_id()
+                                VLOG_DEBUG << "query " << print_id(query_id) 
<< " agg node "
+                                           << _parent->node_id()
                                            << " merge spilled agg data finish";
                             }
                             
Base::_shared_state->in_mem_shared_state->aggregate_data_container
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 0f837f8bbda..35750ee5a55 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -493,6 +493,7 @@ Status 
PartitionedHashJoinProbeOperatorX::prepare(RuntimeState* state) {
     // to avoid prepare _child_x twice
     auto child_x = std::move(_child_x);
     RETURN_IF_ERROR(JoinProbeOperatorX::prepare(state));
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state, 
*_intermediate_row_desc));
     RETURN_IF_ERROR(_inner_probe_operator->set_child(child_x));
     DCHECK(_build_side_child != nullptr);
     _inner_probe_operator->set_build_side_child(_build_side_child);
@@ -648,6 +649,8 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
         }
     }
 
+    const auto partition_index = local_state._partition_cursor;
+    auto& probe_blocks = local_state._probe_blocks[partition_index];
     if (local_state._need_to_setup_internal_operators) {
         *eos = false;
         bool has_data = false;
@@ -659,12 +662,13 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
         }
         RETURN_IF_ERROR(_setup_internal_operators(local_state, state));
         local_state._need_to_setup_internal_operators = false;
+        auto& mutable_block = local_state._partitioned_blocks[partition_index];
+        if (mutable_block && !mutable_block->empty()) {
+            probe_blocks.emplace_back(mutable_block->to_block());
+        }
     }
-
-    auto partition_index = local_state._partition_cursor;
-    bool in_mem_eos_;
+    bool in_mem_eos = false;
     auto* runtime_state = local_state._runtime_state.get();
-    auto& probe_blocks = local_state._probe_blocks[partition_index];
     while (_inner_probe_operator->need_more_input_data(runtime_state)) {
         if (probe_blocks.empty()) {
             *eos = false;
@@ -682,14 +686,16 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
 
         auto block = std::move(probe_blocks.back());
         probe_blocks.pop_back();
-        RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block, 
false));
+        if (!block.empty()) {
+            RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block, 
false));
+        }
     }
 
     
RETURN_IF_ERROR(_inner_probe_operator->pull(local_state._runtime_state.get(), 
output_block,
-                                                &in_mem_eos_));
+                                                &in_mem_eos));
 
     *eos = false;
-    if (in_mem_eos_) {
+    if (in_mem_eos) {
         local_state._partition_cursor++;
         if (local_state._partition_cursor == _partition_count) {
             *eos = true;
@@ -829,6 +835,10 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
             RETURN_IF_ERROR(local_state.finish_spilling(0));
         }
 
+        if (local_state._child_block->rows() == 0 && !local_state._child_eos) {
+            return Status::OK();
+        }
+
         Defer defer([&] { local_state._child_block->clear_column_data(); });
         if (need_to_spill) {
             SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 370606b1904..a0adf0505f8 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -93,6 +93,8 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
     DCHECK_EQ(_spilling_streams_count, 0);
 
     if (!_shared_state->need_to_spill) {
+        profile()->add_info_string("Spilled", "true");
+        _shared_state->need_to_spill = true;
         auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
         _shared_state->inner_shared_state->hash_table_variants.reset();
         auto row_desc = p._child_x->row_desc();
@@ -172,7 +174,6 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
     }
 
     if (_spilling_streams_count > 0) {
-        _shared_state->need_to_spill = true;
         std::unique_lock<std::mutex> lock(_spill_lock);
         if (_spilling_streams_count > 0) {
             _dependency->block();
@@ -202,7 +203,8 @@ Status 
PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
     SCOPED_TIMER(_partition_shuffle_timer);
     auto* channel_ids = 
reinterpret_cast<uint64_t*>(_partitioner->get_channel_ids());
     std::vector<uint32_t> partition_indexes[p._partition_count];
-    for (uint32_t i = 0; i != rows; ++i) {
+    DCHECK_LT(begin, end);
+    for (size_t i = begin; i != end; ++i) {
         partition_indexes[channel_ids[i]].emplace_back(i);
     }
 
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 78c5c9f51e9..cc80fc205d7 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -106,9 +106,10 @@ Status 
SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) {
     auto* sink_local_state = _runtime_state->get_sink_local_state();
     DCHECK(sink_local_state != nullptr);
 
-    _profile->add_info_string("TOP-N", 
*sink_local_state->profile()->get_info_string("TOP-N"));
+    RETURN_IF_ERROR(sink_local_state->open(state));
 
-    return sink_local_state->open(state);
+    _profile->add_info_string("TOP-N", 
*sink_local_state->profile()->get_info_string("TOP-N"));
+    return Status::OK();
 }
 
 SpillSortSinkOperatorX::SpillSortSinkOperatorX(ObjectPool* pool, int 
operator_id,
@@ -231,6 +232,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* 
state) {
 
     auto execution_context = state->get_task_execution_context();
     _shared_state_holder = _shared_state->shared_from_this();
+    auto query_id = state->query_id();
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
@@ -238,10 +240,11 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
     status = ExecEnv::GetInstance()
                      ->spill_stream_mgr()
                      
->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir())
-                     ->submit_func([this, state, &parent, execution_context, 
submit_timer] {
+                     ->submit_func([this, state, query_id, &parent, 
execution_context,
+                                    submit_timer] {
                          auto execution_context_lock = 
execution_context.lock();
                          if (!execution_context_lock) {
-                             LOG(INFO) << "query " << 
print_id(state->query_id())
+                             LOG(INFO) << "query " << print_id(query_id)
                                        << " execution_context released, maybe 
query was cancelled.";
                              return Status::OK();
                          }
@@ -251,16 +254,14 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
                          Defer defer {[&]() {
                              if (!_shared_state->sink_status.ok() || 
state->is_cancelled()) {
                                  if (!_shared_state->sink_status.ok()) {
-                                     LOG(WARNING) << "query " << 
print_id(state->query_id())
-                                                  << " sort node " << 
_parent->id()
-                                                  << " revoke memory error: "
+                                     LOG(WARNING) << "query " << 
print_id(query_id) << " sort node "
+                                                  << _parent->id() << " revoke 
memory error: "
                                                   << 
_shared_state->sink_status;
                                  }
                                  _shared_state->close();
                              } else {
-                                 VLOG_DEBUG << "query " << 
print_id(state->query_id())
-                                            << " sort node " << _parent->id()
-                                            << " revoke memory finish";
+                                 VLOG_DEBUG << "query " << print_id(query_id) 
<< " sort node "
+                                            << _parent->id() << " revoke 
memory finish";
                              }
 
                              
_spilling_stream->end_spill(_shared_state->sink_status);
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 5edb0daf7fc..107868f968d 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -85,14 +85,15 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
 
     auto execution_context = state->get_task_execution_context();
     _shared_state_holder = _shared_state->shared_from_this();
+    auto query_id = state->query_id();
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
 
-    auto spill_func = [this, state, &parent, execution_context, submit_timer] {
+    auto spill_func = [this, state, query_id, &parent, execution_context, 
submit_timer] {
         auto execution_context_lock = execution_context.lock();
         if (!execution_context_lock) {
-            LOG(INFO) << "query " << print_id(state->query_id())
+            LOG(INFO) << "query " << print_id(query_id)
                       << " execution_context released, maybe query was 
cancelled.";
             return Status::OK();
         }
@@ -103,7 +104,7 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
         Defer defer {[&]() {
             if (!_status.ok() || state->is_cancelled()) {
                 if (!_status.ok()) {
-                    LOG(WARNING) << "query " << print_id(state->query_id()) << 
" sort node "
+                    LOG(WARNING) << "query " << print_id(query_id) << " sort 
node "
                                  << _parent->node_id() << " merge spill data 
error: " << _status;
                 }
                 _shared_state->close();
@@ -112,8 +113,8 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
                 }
                 _current_merging_streams.clear();
             } else {
-                VLOG_DEBUG << "query " << print_id(state->query_id()) << " 
sort node "
-                           << _parent->node_id() << " merge spill data finish";
+                VLOG_DEBUG << "query " << print_id(query_id) << " sort node " 
<< _parent->node_id()
+                           << " merge spill data finish";
             }
             _dependency->Dependency::set_ready();
         }};
@@ -121,7 +122,7 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
         vectorized::SpillStreamSPtr tmp_stream;
         while (!state->is_cancelled()) {
             int max_stream_count = _calc_spill_blocks_to_merge();
-            VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort 
node " << _parent->id()
+            VLOG_DEBUG << "query " << print_id(query_id) << " sort node " << 
_parent->id()
                        << " merge spill streams, streams count: "
                        << _shared_state->sorted_streams.size()
                        << ", curren merge max stream count: " << 
max_stream_count;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index e475ed10d5c..76c877c3695 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -151,6 +151,12 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     if (ready()) {
         return Status::OK();
     }
+    std::unordered_map<std::string, std::unique_ptr<vectorized::SpillDataDir>> 
spill_store_map;
+    for (const auto& spill_path : spill_store_paths) {
+        spill_store_map.emplace(spill_path.path, 
std::make_unique<vectorized::SpillDataDir>(
+                                                         spill_path.path, 
spill_path.capacity_bytes,
+                                                         
spill_path.storage_medium));
+    }
     init_doris_metrics(store_paths);
     _store_paths = store_paths;
     _tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(_store_paths);
@@ -246,7 +252,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     _wal_manager = WalManager::create_shared(this, 
config::group_commit_wal_path);
     _dns_cache = new DNSCache();
     _write_cooldown_meta_executors = 
std::make_unique<WriteCooldownMetaExecutors>();
-    _spill_stream_mgr = new vectorized::SpillStreamManager(spill_store_paths);
+    _spill_stream_mgr = new 
vectorized::SpillStreamManager(std::move(spill_store_map));
     _backend_client_cache->init_metrics("backend");
     _frontend_client_cache->init_metrics("frontend");
     _broker_client_cache->init_metrics("broker");
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 78e2cd1fe06..2e75218e9cf 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -174,7 +174,6 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
     }
 
     auto process_mem_used = doris::MemInfo::proc_mem_no_allocator_cache();
-    auto sys_mem_available = doris::MemInfo::sys_mem_available();
     if (proc_vm_rss < all_queries_mem_used) {
         all_queries_mem_used = proc_vm_rss;
     }
@@ -185,6 +184,7 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
     // we count these cache memories equally on workload groups.
     double ratio = (double)proc_vm_rss / (double)all_queries_mem_used;
     if (ratio >= 1.25) {
+        auto sys_mem_available = doris::MemInfo::sys_mem_available();
         std::string debug_msg = fmt::format(
                 "\nProcess Memory Summary: process_vm_rss: {}, process mem: 
{}, sys mem available: "
                 "{}, all quries mem: {}",
@@ -192,7 +192,7 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
                 PrettyPrinter::print(process_mem_used, TUnit::BYTES),
                 PrettyPrinter::print(sys_mem_available, TUnit::BYTES),
                 PrettyPrinter::print(all_queries_mem_used, TUnit::BYTES));
-        LOG_EVERY_N(INFO, 10) << debug_msg;
+        VLOG_EVERY_N(1, 10) << debug_msg;
     }
 
     for (auto& wg : _workload_groups) {
@@ -229,8 +229,10 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
                     PrettyPrinter::print(query_weighted_mem_limit, 
TUnit::BYTES));
 
             debug_msg += "\n  Query Memory Summary:";
+        } else {
+            continue;
         }
-        // check where queries need to revoke memory for task group
+        // check whether queries need to revoke memory for task group
         for (const auto& query : wg_queries) {
             auto query_ctx = query.second.lock();
             if (!query_ctx) {
@@ -253,7 +255,7 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
             }
         }
         if (wg_mem_info.is_high_wartermark || wg_mem_info.is_low_wartermark) {
-            LOG_EVERY_N(INFO, 10) << debug_msg;
+            VLOG_EVERY_N(1, 10) << debug_msg;
         }
     }
 }
diff --git a/be/src/vec/spill/spill_stream_manager.cpp 
b/be/src/vec/spill/spill_stream_manager.cpp
index 4abca15082c..0259bef33f3 100644
--- a/be/src/vec/spill/spill_stream_manager.cpp
+++ b/be/src/vec/spill/spill_stream_manager.cpp
@@ -27,43 +27,48 @@
 #include <random>
 #include <string>
 
+#include "common/logging.h"
 #include "io/fs/file_system.h"
 #include "io/fs/local_file_system.h"
 #include "olap/olap_define.h"
 #include "runtime/runtime_state.h"
+#include "util/parse_util.h"
+#include "util/pretty_printer.h"
 #include "util/runtime_profile.h"
 #include "util/time.h"
 #include "vec/spill/spill_stream.h"
 
 namespace doris::vectorized {
 
-SpillStreamManager::SpillStreamManager(const std::vector<StorePath>& paths)
-        : _spill_store_paths(paths), _stop_background_threads_latch(1) {}
+SpillStreamManager::SpillStreamManager(
+        std::unordered_map<std::string, 
std::unique_ptr<vectorized::SpillDataDir>>&&
+                spill_store_map)
+        : _spill_store_map(std::move(spill_store_map)), 
_stop_background_threads_latch(1) {}
 
 Status SpillStreamManager::init() {
     LOG(INFO) << "init spill stream manager";
-    RETURN_NOT_OK_STATUS_WITH_WARN(_init_spill_store_map(), 
"_init_spill_store_map failed");
+    RETURN_IF_ERROR(_init_spill_store_map());
 
     int spill_io_thread_count = 
config::spill_io_thread_pool_per_disk_thread_num;
     if (spill_io_thread_count <= 0) {
         spill_io_thread_count = 2;
     }
     int pool_idx = 0;
-    for (const auto& path : _spill_store_paths) {
-        auto gc_dir_root_dir = fmt::format("{}/{}", path.path, 
SPILL_GC_DIR_PREFIX);
+    for (const auto& [path, store] : _spill_store_map) {
+        auto gc_dir_root_dir = fmt::format("{}/{}", path, SPILL_GC_DIR_PREFIX);
         bool exists = true;
         RETURN_IF_ERROR(io::global_local_filesystem()->exists(gc_dir_root_dir, 
&exists));
         if (!exists) {
             
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(gc_dir_root_dir));
         }
 
-        auto spill_dir = fmt::format("{}/{}", path.path, SPILL_DIR_PREFIX);
+        auto spill_dir = fmt::format("{}/{}", path, SPILL_DIR_PREFIX);
         RETURN_IF_ERROR(io::global_local_filesystem()->exists(spill_dir, 
&exists));
         if (!exists) {
             
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(spill_dir));
         } else {
             auto suffix = ToStringFromUnixMillis(UnixMillis());
-            auto gc_dir = fmt::format("{}/{}/{}", path.path, 
SPILL_GC_DIR_PREFIX, suffix);
+            auto gc_dir = fmt::format("{}/{}/{}", path, SPILL_GC_DIR_PREFIX, 
suffix);
             if (std::filesystem::exists(gc_dir)) {
                 LOG(WARNING) << "gc dir already exists: " << gc_dir;
             }
@@ -78,7 +83,7 @@ Status SpillStreamManager::init() {
                 spill_data_size += dir_entry.file_size();
             }
         }
-        path_to_spill_data_size_[path.path] = spill_data_size;
+        store->update_spill_data_usage(spill_data_size);
 
         std::unique_ptr<ThreadPool> io_pool;
         
static_cast<void>(ThreadPoolBuilder(fmt::format("SpillIOThreadPool-{}", 
pool_idx++))
@@ -86,7 +91,7 @@ Status SpillStreamManager::init() {
                                   .set_max_threads(spill_io_thread_count)
                                   
.set_max_queue_size(config::spill_io_thread_pool_queue_size)
                                   .build(&io_pool));
-        path_to_io_thread_pool_[path.path] = std::move(io_pool);
+        path_to_io_thread_pool_[path] = std::move(io_pool);
     }
     static_cast<void>(ThreadPoolBuilder("SpillAsyncTaskThreadPool")
                               
.set_min_threads(config::spill_async_task_thread_pool_thread_num)
@@ -105,21 +110,16 @@ Status SpillStreamManager::init() {
 void SpillStreamManager::_spill_gc_thread_callback() {
     while (!_stop_background_threads_latch.wait_for(
             std::chrono::milliseconds(config::spill_gc_interval_ms))) {
-        gc(2000);
+        gc(config::spill_gc_file_count);
+        for (auto& [path, dir] : _spill_store_map) {
+            static_cast<void>(dir->update_capacity());
+        }
     }
 }
 
 Status SpillStreamManager::_init_spill_store_map() {
-    for (const auto& path : _spill_store_paths) {
-        auto store =
-                std::make_unique<SpillDataDir>(path.path, path.capacity_bytes, 
path.storage_medium);
-        auto st = store->init();
-        if (!st.ok()) {
-            LOG(WARNING) << "Store load failed, status=" << st.to_string()
-                         << ", path=" << store->path();
-            return st;
-        }
-        _spill_store_map.emplace(store->path(), std::move(store));
+    for (const auto& store : _spill_store_map) {
+        RETURN_IF_ERROR(store.second->init());
     }
 
     return Status::OK();
@@ -133,18 +133,23 @@ std::vector<SpillDataDir*> 
SpillStreamManager::_get_stores_for_spill(
             stores.push_back(store.get());
         }
     }
+    if (stores.empty()) {
+        return stores;
+    }
 
-    std::sort(stores.begin(), stores.end(),
-              [](SpillDataDir* a, SpillDataDir* b) { return a->get_usage(0) < 
b->get_usage(0); });
+    std::sort(stores.begin(), stores.end(), [](SpillDataDir* a, SpillDataDir* 
b) {
+        return a->_get_disk_usage(0) < b->_get_disk_usage(0);
+    });
 
     size_t seventy_percent_index = stores.size();
     size_t eighty_five_percent_index = stores.size();
     for (size_t index = 0; index < stores.size(); index++) {
         // If the usage of the store is less than 70%, we choose disk randomly.
-        if (stores[index]->get_usage(0) > 0.7 && seventy_percent_index == 
stores.size()) {
+        if (stores[index]->_get_disk_usage(0) > 0.7 && seventy_percent_index 
== stores.size()) {
             seventy_percent_index = index;
         }
-        if (stores[index]->get_usage(0) > 0.85 && eighty_five_percent_index == 
stores.size()) {
+        if (stores[index]->_get_disk_usage(0) > 0.85 &&
+            eighty_five_percent_index == stores.size()) {
             eighty_five_percent_index = index;
             break;
         }
@@ -153,9 +158,13 @@ std::vector<SpillDataDir*> 
SpillStreamManager::_get_stores_for_spill(
     std::random_device rd;
     std::mt19937 g(rd());
     std::shuffle(stores.begin(), stores.begin() + seventy_percent_index, g);
-    std::shuffle(stores.begin() + seventy_percent_index, stores.begin() + 
eighty_five_percent_index,
-                 g);
-    std::shuffle(stores.begin() + eighty_five_percent_index, stores.end(), g);
+    if (seventy_percent_index != stores.size()) {
+        std::shuffle(stores.begin() + seventy_percent_index,
+                     stores.begin() + eighty_five_percent_index, g);
+    }
+    if (eighty_five_percent_index != stores.size()) {
+        std::shuffle(stores.begin() + eighty_five_percent_index, stores.end(), 
g);
+    }
 
     return stores;
 }
@@ -210,8 +219,8 @@ void SpillStreamManager::gc(int64_t max_file_count) {
 
     bool exists = true;
     int64_t count = 0;
-    for (const auto& path : _spill_store_paths) {
-        std::string gc_root_dir = fmt::format("{}/{}", path.path, 
SPILL_GC_DIR_PREFIX);
+    for (const auto& [path, store_dir] : _spill_store_map) {
+        std::string gc_root_dir = fmt::format("{}/{}", path, 
SPILL_GC_DIR_PREFIX);
 
         std::error_code ec;
         exists = std::filesystem::exists(gc_root_dir, ec);
@@ -243,7 +252,7 @@ void SpillStreamManager::gc(int64_t max_file_count) {
             }
 
             int64_t data_size = 0;
-            Defer defer {[&]() { update_usage(path.path, -data_size); }};
+            Defer defer {[&]() { 
store_dir->update_spill_data_usage(-data_size); }};
 
             for (const auto& file : files) {
                 auto abs_file_path = fmt::format("{}/{}", abs_dir, 
file.file_name);
@@ -257,11 +266,10 @@ void SpillStreamManager::gc(int64_t max_file_count) {
     }
 }
 
-SpillDataDir::SpillDataDir(const std::string& path, int64_t capacity_bytes,
+SpillDataDir::SpillDataDir(std::string path, int64_t capacity_bytes,
                            TStorageMedium::type storage_medium)
-        : _path(path),
-          _available_bytes(0),
-          _disk_capacity_bytes(0),
+        : _path(std::move(path)),
+          _disk_capacity_bytes(capacity_bytes),
           _storage_medium(storage_medium) {}
 
 Status SpillDataDir::init() {
@@ -271,11 +279,45 @@ Status SpillDataDir::init() {
         RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("opendir failed, 
path={}", _path),
                                        "check file exist failed");
     }
+    RETURN_IF_ERROR(update_capacity());
+    LOG(INFO) << fmt::format(
+            "spill storage path: {}, capacity: {}, limit: {}, available: "
+            "{}",
+            _path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
+            PrettyPrinter::print_bytes(_spill_data_limit_bytes),
+            PrettyPrinter::print_bytes(_available_bytes));
+    return Status::OK();
+}
+
+Status SpillDataDir::update_capacity() {
+    std::lock_guard<std::mutex> l(_mutex);
+    RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(_path, 
&_disk_capacity_bytes,
+                                                                  
&_available_bytes));
+    auto disk_use_max_bytes = (int64_t)(_disk_capacity_bytes *
+                                        
config::storage_flood_stage_usage_percent / (double)100);
+    bool is_percent = true;
+    _spill_data_limit_bytes = 
ParseUtil::parse_mem_spec(config::spill_storage_limit, -1,
+                                                        _disk_capacity_bytes, 
&is_percent);
+    if (_spill_data_limit_bytes <= 0) {
+        auto err_msg = fmt::format("Failed to parse spill storage limit from 
'{}'",
+                                   config::spill_storage_limit);
+        LOG(WARNING) << err_msg;
+        return Status::InvalidArgument(err_msg);
+    }
+    if (is_percent) {
+        _spill_data_limit_bytes =
+                (int64_t)(_spill_data_limit_bytes * 
config::storage_flood_stage_usage_percent /
+                          (double)100);
+    }
+    if (_spill_data_limit_bytes > disk_use_max_bytes) {
+        _spill_data_limit_bytes = disk_use_max_bytes;
+    }
 
     return Status::OK();
 }
-bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) {
-    double used_pct = get_usage(incoming_data_size);
+
+bool SpillDataDir::_reach_disk_capacity_limit(int64_t incoming_data_size) {
+    double used_pct = _get_disk_usage(incoming_data_size);
     int64_t left_bytes = _available_bytes - incoming_data_size;
     if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 &&
         left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
@@ -285,4 +327,24 @@ bool SpillDataDir::reach_capacity_limit(int64_t 
incoming_data_size) {
     }
     return false;
 }
+bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) {
+    std::lock_guard<std::mutex> l(_mutex);
+    if (_reach_disk_capacity_limit(incoming_data_size)) {
+        return true;
+    }
+    if (_spill_data_bytes + incoming_data_size > _spill_data_limit_bytes) {
+        LOG_EVERY_T(WARNING, 1) << fmt::format(
+                "spill data reach limit, path: {}, capacity: {}, limit: {}, 
used: {}, available: "
+                "{}, "
+                "incoming "
+                "bytes: {}",
+                _path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
+                PrettyPrinter::print_bytes(_spill_data_limit_bytes),
+                PrettyPrinter::print_bytes(_spill_data_bytes),
+                PrettyPrinter::print_bytes(_available_bytes),
+                PrettyPrinter::print_bytes(incoming_data_size));
+        return true;
+    }
+    return false;
+}
 } // namespace doris::vectorized
diff --git a/be/src/vec/spill/spill_stream_manager.h 
b/be/src/vec/spill/spill_stream_manager.h
index f73f840458e..2d7350f775f 100644
--- a/be/src/vec/spill/spill_stream_manager.h
+++ b/be/src/vec/spill/spill_stream_manager.h
@@ -31,48 +31,65 @@ class RuntimeProfile;
 
 namespace vectorized {
 
+class SpillStreamManager;
 class SpillDataDir {
 public:
-    SpillDataDir(const std::string& path, int64_t capacity_bytes = -1,
+    SpillDataDir(std::string path, int64_t capacity_bytes,
                  TStorageMedium::type storage_medium = TStorageMedium::HDD);
 
     Status init();
 
     const std::string& path() const { return _path; }
 
-    bool is_ssd_disk() const { return _storage_medium == TStorageMedium::SSD; }
-
     TStorageMedium::type storage_medium() const { return _storage_medium; }
 
     // check if the capacity reach the limit after adding the incoming data
     // return true if limit reached, otherwise, return false.
-    // TODO(cmy): for now we can not precisely calculate the capacity Doris 
used,
-    // so in order to avoid running out of disk capacity, we currently use the 
actual
-    // disk available capacity and total capacity to do the calculation.
-    // So that the capacity Doris actually used may exceeds the user specified 
capacity.
     bool reach_capacity_limit(int64_t incoming_data_size);
 
     Status update_capacity();
 
-    double get_usage(int64_t incoming_data_size) const {
+    void update_spill_data_usage(int64_t incoming_data_size) {
+        std::lock_guard<std::mutex> l(_mutex);
+        _spill_data_bytes += incoming_data_size;
+    }
+
+    int64_t get_spill_data_bytes() {
+        std::lock_guard<std::mutex> l(_mutex);
+        return _spill_data_bytes;
+    }
+
+    int64_t get_spill_data_limit() {
+        std::lock_guard<std::mutex> l(_mutex);
+        return _spill_data_limit_bytes;
+    }
+
+private:
+    bool _reach_disk_capacity_limit(int64_t incoming_data_size);
+    double _get_disk_usage(int64_t incoming_data_size) const {
         return _disk_capacity_bytes == 0
                        ? 0
                        : (_disk_capacity_bytes - _available_bytes + 
incoming_data_size) /
                                  (double)_disk_capacity_bytes;
     }
 
-private:
+    friend class SpillStreamManager;
     std::string _path;
 
-    // the actual available capacity of the disk of this data dir
-    size_t _available_bytes;
+    // protect _disk_capacity_bytes, _available_bytes, 
_spill_data_limit_bytes, _spill_data_bytes
+    std::mutex _mutex;
     // the actual capacity of the disk of this data dir
     size_t _disk_capacity_bytes;
+    int64_t _spill_data_limit_bytes = 0;
+    // the actual available capacity of the disk of this data dir
+    size_t _available_bytes = 0;
+    int64_t _spill_data_bytes = 0;
     TStorageMedium::type _storage_medium;
 };
 class SpillStreamManager {
 public:
-    SpillStreamManager(const std::vector<StorePath>& paths);
+    SpillStreamManager(std::unordered_map<std::string, 
std::unique_ptr<vectorized::SpillDataDir>>&&
+                               spill_store_map);
 
     Status init();
 
@@ -93,16 +110,6 @@ public:
 
     void gc(int64_t max_file_count);
 
-    void update_usage(const std::string& path, int64_t incoming_data_size) {
-        path_to_spill_data_size_[path] += incoming_data_size;
-    }
-
-    static bool reach_capacity_limit(size_t size, size_t incoming_data_size) {
-        return size + incoming_data_size > config::spill_storage_limit;
-    }
-
-    int64_t spilled_data_size(const std::string& path) { return 
path_to_spill_data_size_[path]; }
-
     ThreadPool* get_spill_io_thread_pool(const std::string& path) const {
         const auto it = path_to_io_thread_pool_.find(path);
         DCHECK(it != path_to_io_thread_pool_.end());
@@ -115,13 +122,11 @@ private:
     void _spill_gc_thread_callback();
     std::vector<SpillDataDir*> _get_stores_for_spill(TStorageMedium::type 
storage_medium);
 
-    std::vector<StorePath> _spill_store_paths;
     std::unordered_map<std::string, std::unique_ptr<SpillDataDir>> 
_spill_store_map;
 
     CountDownLatch _stop_background_threads_latch;
     std::unique_ptr<ThreadPool> async_task_thread_pool_;
     std::unordered_map<std::string, std::unique_ptr<ThreadPool>> 
path_to_io_thread_pool_;
-    std::unordered_map<std::string, std::atomic_int64_t> 
path_to_spill_data_size_;
     scoped_refptr<Thread> _spill_gc_thread;
 
     std::atomic_uint64_t id_ = 0;
diff --git a/be/src/vec/spill/spill_writer.cpp 
b/be/src/vec/spill/spill_writer.cpp
index f657f580074..a48fb0b1dcb 100644
--- a/be/src/vec/spill/spill_writer.cpp
+++ b/be/src/vec/spill/spill_writer.cpp
@@ -52,7 +52,7 @@ Status SpillWriter::close() {
     total_written_bytes_ += meta_.size();
     COUNTER_UPDATE(write_bytes_counter_, meta_.size());
 
-    
ExecEnv::GetInstance()->spill_stream_mgr()->update_usage(data_dir_->path(), 
meta_.size());
+    data_dir_->update_spill_data_usage(meta_.size());
 
     RETURN_IF_ERROR(file_writer_->close());
 
@@ -112,21 +112,19 @@ Status SpillWriter::_write_internal(const Block& block, 
size_t& written_bytes) {
                         "serialize spill data error. [path={}]", file_path_);
             }
         }
-        auto* spill_stream_mgr = ExecEnv::GetInstance()->spill_stream_mgr();
-        auto splled_data_size = 
spill_stream_mgr->spilled_data_size(data_dir_->path());
-        if (spill_stream_mgr->reach_capacity_limit(splled_data_size, 
buff.size())) {
+        if (data_dir_->reach_capacity_limit(buff.size())) {
             return Status::Error<ErrorCode::DISK_REACH_CAPACITY_LIMIT>(
                     "spill data total size exceed limit, path: {}, size limit: 
{}, spill data "
                     "size: {}",
-                    data_dir_->path(), 
PrettyPrinter::print_bytes(config::spill_storage_limit),
-                    PrettyPrinter::print_bytes(
-                            
spill_stream_mgr->spilled_data_size(data_dir_->path())));
+                    data_dir_->path(),
+                    
PrettyPrinter::print_bytes(data_dir_->get_spill_data_limit()),
+                    
PrettyPrinter::print_bytes(data_dir_->get_spill_data_bytes()));
         }
 
         {
             Defer defer {[&]() {
                 if (status.ok()) {
-                    spill_stream_mgr->update_usage(data_dir_->path(), 
buff.size());
+                    data_dir_->update_spill_data_usage(buff.size());
                 }
             }};
             {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index d0c91ab9b34..26b3cf4adc9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -1713,7 +1713,7 @@ public class SessionVariable implements Serializable, 
Writable {
             description = {"控制是否启用join算子落盘。默认为 false。",
                     "Controls whether to enable spill to disk of join 
operation. "
                             + "The default value is false."},
-            needForward = true)
+            needForward = true, fuzzy = true)
     public boolean enableJoinSpill = false;
 
     @VariableMgr.VarAttr(
@@ -1721,7 +1721,7 @@ public class SessionVariable implements Serializable, 
Writable {
             description = {"控制是否启用排序算子落盘。默认为 false。",
                     "Controls whether to enable spill to disk of sort 
operation. "
                             + "The default value is false."},
-            needForward = true)
+            needForward = true, fuzzy = true)
     public boolean enableSortSpill = false;
 
     @VariableMgr.VarAttr(
@@ -1729,14 +1729,14 @@ public class SessionVariable implements Serializable, 
Writable {
             description = {"控制是否启用聚合算子落盘。默认为 false。",
                     "Controls whether to enable spill to disk of aggregation 
operation. "
                             + "The default value is false."},
-            needForward = true)
+            needForward = true, fuzzy = true)
     public boolean enableAggSpill = false;
 
     // If the memory consumption of sort node exceed this limit, will trigger 
spill to disk;
     // Set to 0 to disable; min: 128M
     public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152;
     @VariableMgr.VarAttr(name = EXTERNAL_SORT_BYTES_THRESHOLD,
-            checker = "checkExternalSortBytesThreshold", fuzzy = true)
+            checker = "checkExternalSortBytesThreshold", varType = 
VariableAnnotation.DEPRECATED)
     public long externalSortBytesThreshold = 0;
 
     // Set to 0 to disable; min: 128M
@@ -1747,7 +1747,7 @@ public class SessionVariable implements Serializable, 
Writable {
 
     // The memory limit of streaming agg when spilling is enabled
     // NOTE: streaming agg operator will not spill to disk.
-    @VariableMgr.VarAttr(name = SPILL_STREAMING_AGG_MEM_LIMIT)
+    @VariableMgr.VarAttr(name = SPILL_STREAMING_AGG_MEM_LIMIT, fuzzy = true)
     public long spillStreamingAggMemLimit = 268435456; //256MB
 
     public static final int MIN_EXTERNAL_AGG_PARTITION_BITS = 4;
@@ -1887,6 +1887,38 @@ public class SessionVariable implements Serializable, 
Writable {
 
         // set random 1, 10, 100, 1000, 10000
         this.topnOptLimitThreshold = (int) Math.pow(10, random.nextInt(5));
+
+        // for spill to disk
+        /*
+        if (Config.pull_request_id > 10000) {
+            if (Config.pull_request_id % 2 == 1) {
+                this.enablePipelineXEngine = true;
+                this.enableJoinSpill = true;
+                this.enableSortSpill = true;
+                this.enableAggSpill = true;
+
+                randomInt = random.nextInt(4);
+                switch (randomInt) {
+                    case 0:
+                        this.minRevocableMem = 0;
+                        break;
+                    case 1:
+                        this.minRevocableMem = 1;
+                        break;
+                    case 2:
+                        this.minRevocableMem = 1024 * 1024;
+                        break;
+                    default:
+                        this.minRevocableMem = 100 * 1024 * 1024 * 1024;
+                        break;
+                }
+            } else {
+                this.enableJoinSpill = false;
+                this.enableSortSpill = false;
+                this.enableAggSpill = false;
+            }
+        }
+        */
     }
 
     public String printFuzzyVariables() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to