This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f8a9b025479 [fix](spill) fix storage engine type cast error (#32071)
f8a9b025479 is described below
commit f8a9b025479240a86d97ed7e29970840d1c99f2c
Author: TengJianPing <[email protected]>
AuthorDate: Mon Mar 11 22:48:16 2024 +0800
[fix](spill) fix storage engine type cast error (#32071)
---
be/src/vec/spill/spill_stream.cpp | 5 +++-
be/src/vec/spill/spill_stream.h | 11 ++++----
be/src/vec/spill/spill_stream_manager.cpp | 46 +++++++++++++++++++++++++------
be/src/vec/spill/spill_stream_manager.h | 43 +++++++++++++++++++++++++++--
be/src/vec/spill/spill_writer.cpp | 1 -
be/src/vec/spill/spill_writer.h | 6 ++--
6 files changed, 90 insertions(+), 22 deletions(-)
diff --git a/be/src/vec/spill/spill_stream.cpp
b/be/src/vec/spill/spill_stream.cpp
index d770e44a147..d08b63df40b 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -32,7 +32,7 @@
#include "vec/spill/spill_writer.h"
namespace doris::vectorized {
-SpillStream::SpillStream(RuntimeState* state, int64_t stream_id,
doris::DataDir* data_dir,
+SpillStream::SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir*
data_dir,
std::string spill_dir, size_t batch_rows, size_t
batch_bytes,
RuntimeProfile* profile)
: state_(state),
@@ -72,6 +72,9 @@ void SpillStream::close() {
(void)reader_->close();
}
+const std::string& SpillStream::get_spill_root_dir() const {
+ return data_dir_->path();
+}
Status SpillStream::prepare_spill() {
DCHECK(!spill_promise_);
RETURN_IF_ERROR(writer_->open());
diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h
index 579449e503c..9f328240d75 100644
--- a/be/src/vec/spill/spill_stream.h
+++ b/be/src/vec/spill/spill_stream.h
@@ -20,29 +20,28 @@
#include <future>
#include <memory>
-#include "olap/data_dir.h"
#include "vec/spill/spill_reader.h"
#include "vec/spill/spill_writer.h"
namespace doris {
class RuntimeProfile;
-class DataDir;
class ThreadPool;
namespace vectorized {
class Block;
+class SpillDataDir;
class SpillStream {
public:
- SpillStream(RuntimeState* state, int64_t stream_id, doris::DataDir*
data_dir,
+ SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* data_dir,
std::string spill_dir, size_t batch_rows, size_t batch_bytes,
RuntimeProfile* profile);
int64_t id() const { return stream_id_; }
- DataDir* get_data_dir() const { return data_dir_; }
- const std::string& get_spill_root_dir() const { return data_dir_->path(); }
+ SpillDataDir* get_data_dir() const { return data_dir_; }
+ const std::string& get_spill_root_dir() const;
const std::string& get_spill_dir() const { return spill_dir_; }
@@ -85,7 +84,7 @@ private:
ThreadPool* io_thread_pool_;
int64_t stream_id_;
std::atomic_bool closed_ = false;
- doris::DataDir* data_dir_ = nullptr;
+ SpillDataDir* data_dir_ = nullptr;
std::string spill_dir_;
size_t batch_rows_;
size_t batch_bytes_;
diff --git a/be/src/vec/spill/spill_stream_manager.cpp
b/be/src/vec/spill/spill_stream_manager.cpp
index 4dfd847482d..ecbee7fb858 100644
--- a/be/src/vec/spill/spill_stream_manager.cpp
+++ b/be/src/vec/spill/spill_stream_manager.cpp
@@ -29,9 +29,8 @@
#include "io/fs/file_system.h"
#include "io/fs/local_file_system.h"
-#include "olap/data_dir.h"
#include "olap/olap_define.h"
-#include "olap/storage_engine.h"
+#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "util/time.h"
#include "vec/spill/spill_stream.h"
@@ -112,9 +111,9 @@ void SpillStreamManager::_spill_gc_thread_callback() {
Status SpillStreamManager::_init_spill_store_map() {
for (const auto& path : _spill_store_paths) {
- auto store =
std::make_unique<DataDir>(ExecEnv::GetInstance()->storage_engine().to_local(),
- path.path, path.capacity_bytes,
path.storage_medium);
- auto st = store->init(false);
+ auto store =
+ std::make_unique<SpillDataDir>(path.path, path.capacity_bytes,
path.storage_medium);
+ auto st = store->init();
if (!st.ok()) {
LOG(WARNING) << "Store load failed, status=" << st.to_string()
<< ", path=" << store->path();
@@ -126,9 +125,9 @@ Status SpillStreamManager::_init_spill_store_map() {
return Status::OK();
}
-std::vector<DataDir*> SpillStreamManager::_get_stores_for_spill(
+std::vector<SpillDataDir*> SpillStreamManager::_get_stores_for_spill(
TStorageMedium::type storage_medium) {
- std::vector<DataDir*> stores;
+ std::vector<SpillDataDir*> stores;
for (auto&& [_, store] : _spill_store_map) {
if (store->storage_medium() == storage_medium &&
!store->reach_capacity_limit(0)) {
stores.push_back(store.get());
@@ -136,7 +135,7 @@ std::vector<DataDir*>
SpillStreamManager::_get_stores_for_spill(
}
std::sort(stores.begin(), stores.end(),
- [](DataDir* a, DataDir* b) { return a->get_usage(0) <
b->get_usage(0); });
+ [](SpillDataDir* a, SpillDataDir* b) { return a->get_usage(0) <
b->get_usage(0); });
size_t seventy_percent_index = stores.size();
size_t eighty_five_percent_index = stores.size();
@@ -176,7 +175,7 @@ Status
SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStrea
int64_t id = id_++;
std::string spill_dir;
- doris::DataDir* data_dir = nullptr;
+ SpillDataDir* data_dir = nullptr;
for (auto& dir : data_dirs) {
data_dir = dir;
std::string spill_root_dir = fmt::format("{}/{}", data_dir->path(),
SPILL_DIR_PREFIX);
@@ -259,4 +258,33 @@ void SpillStreamManager::gc(int64_t max_file_count) {
}
}
}
+
+SpillDataDir::SpillDataDir(const std::string& path, int64_t capacity_bytes,
+ TStorageMedium::type storage_medium)
+ : _path(path),
+ _available_bytes(0),
+ _disk_capacity_bytes(0),
+ _storage_medium(storage_medium) {}
+
+Status SpillDataDir::init() {
+ bool exists = false;
+ RETURN_IF_ERROR(io::global_local_filesystem()->exists(_path, &exists));
+ if (!exists) {
+ RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("opendir failed,
path={}", _path),
+ "check file exist failed");
+ }
+
+ return Status::OK();
+}
+bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) {
+ double used_pct = get_usage(incoming_data_size);
+ int64_t left_bytes = _available_bytes - incoming_data_size;
+ if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 &&
+ left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
+ LOG(WARNING) << "reach capacity limit. used pct: " << used_pct
+ << ", left bytes: " << left_bytes << ", path: " << _path;
+ return true;
+ }
+ return false;
+}
} // namespace doris::vectorized
diff --git a/be/src/vec/spill/spill_stream_manager.h
b/be/src/vec/spill/spill_stream_manager.h
index 2b412072038..f73f840458e 100644
--- a/be/src/vec/spill/spill_stream_manager.h
+++ b/be/src/vec/spill/spill_stream_manager.h
@@ -31,6 +31,45 @@ class RuntimeProfile;
namespace vectorized {
+class SpillDataDir {
+public:
+ SpillDataDir(const std::string& path, int64_t capacity_bytes = -1,
+ TStorageMedium::type storage_medium = TStorageMedium::HDD);
+
+ Status init();
+
+ const std::string& path() const { return _path; }
+
+ bool is_ssd_disk() const { return _storage_medium == TStorageMedium::SSD; }
+
+ TStorageMedium::type storage_medium() const { return _storage_medium; }
+
+ // check if the capacity reach the limit after adding the incoming data
+ // return true if limit reached, otherwise, return false.
+ // TODO(cmy): for now we can not precisely calculate the capacity Doris
used,
+ // so in order to avoid running out of disk capacity, we currently use the
actual
+ // disk available capacity and total capacity to do the calculation.
+ // So that the capacity Doris actually used may exceeds the user specified
capacity.
+ bool reach_capacity_limit(int64_t incoming_data_size);
+
+ Status update_capacity();
+
+ double get_usage(int64_t incoming_data_size) const {
+ return _disk_capacity_bytes == 0
+ ? 0
+ : (_disk_capacity_bytes - _available_bytes +
incoming_data_size) /
+ (double)_disk_capacity_bytes;
+ }
+
+private:
+ std::string _path;
+
+ // the actual available capacity of the disk of this data dir
+ size_t _available_bytes;
+ // the actual capacity of the disk of this data dir
+ size_t _disk_capacity_bytes;
+ TStorageMedium::type _storage_medium;
+};
class SpillStreamManager {
public:
SpillStreamManager(const std::vector<StorePath>& paths);
@@ -74,10 +113,10 @@ public:
private:
Status _init_spill_store_map();
void _spill_gc_thread_callback();
- std::vector<DataDir*> _get_stores_for_spill(TStorageMedium::type
storage_medium);
+ std::vector<SpillDataDir*> _get_stores_for_spill(TStorageMedium::type
storage_medium);
std::vector<StorePath> _spill_store_paths;
- std::unordered_map<std::string, std::unique_ptr<DataDir>> _spill_store_map;
+ std::unordered_map<std::string, std::unique_ptr<SpillDataDir>>
_spill_store_map;
CountDownLatch _stop_background_threads_latch;
std::unique_ptr<ThreadPool> async_task_thread_pool_;
diff --git a/be/src/vec/spill/spill_writer.cpp
b/be/src/vec/spill/spill_writer.cpp
index 6e048ecd0dd..d51dcf8f1ec 100644
--- a/be/src/vec/spill/spill_writer.cpp
+++ b/be/src/vec/spill/spill_writer.cpp
@@ -21,7 +21,6 @@
#include "io/file_factory.h"
#include "io/fs/local_file_system.h"
#include "io/fs/local_file_writer.h"
-#include "olap/data_dir.h"
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "vec/spill/spill_stream_manager.h"
diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h
index 1ecda1aff9c..14e8120f775 100644
--- a/be/src/vec/spill/spill_writer.h
+++ b/be/src/vec/spill/spill_writer.h
@@ -25,12 +25,12 @@
#include "util/runtime_profile.h"
#include "vec/core/block.h"
namespace doris {
-class DataDir;
namespace vectorized {
+class SpillDataDir;
class SpillWriter {
public:
- SpillWriter(int64_t id, size_t batch_size, doris::DataDir* data_dir, const
std::string& dir)
+ SpillWriter(int64_t id, size_t batch_size, SpillDataDir* data_dir, const
std::string& dir)
: data_dir_(data_dir), stream_id_(id), batch_size_(batch_size) {
file_path_ = dir + "/" + std::to_string(file_index_);
}
@@ -66,7 +66,7 @@ private:
// not owned, point to the data dir of this rowset
// for checking disk capacity when write data to disk.
- doris::DataDir* data_dir_ = nullptr;
+ SpillDataDir* data_dir_ = nullptr;
std::atomic_bool closed_ = false;
int64_t stream_id_;
size_t batch_size_;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]