This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 72cca294210 [Update](cloud) add inverted index tmp dir support (#31484)
72cca294210 is described below
commit 72cca294210a0ad7eb138aaf54529f9147f37bf0
Author: airborne12 <[email protected]>
AuthorDate: Fri Mar 1 21:29:00 2024 +0800
[Update](cloud) add inverted index tmp dir support (#31484)
---
be/src/common/config.cpp | 2 ++
be/src/common/config.h | 3 ++
.../inverted_index_compound_directory.cpp | 3 +-
.../rowset/segment_v2/inverted_index_writer.cpp | 15 +++++++--
.../olap/rowset/segment_v2/inverted_index_writer.h | 36 ++++++++++++++++++++++
be/src/runtime/exec_env.h | 5 +++
be/src/runtime/exec_env_init.cpp | 2 ++
7 files changed, 63 insertions(+), 3 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index dc3b7ae0449..b30191d9e17 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -908,6 +908,8 @@ DEFINE_Validator(file_cache_type, [](std::string_view
config) -> bool {
return config.empty() || config == "file_block_cache";
});
+DEFINE_String(tmp_file_dir, "tmp");
+
DEFINE_Int32(s3_transfer_executor_pool_size, "2");
DEFINE_Bool(enable_time_lut, "true");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index b95969b761b..0160919e66d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1257,6 +1257,9 @@ DECLARE_mBool(check_segment_when_build_rowset_meta);
// max s3 client retry times
DECLARE_mInt32(max_s3_client_retry);
+// write as inverted index tmp directory
+DECLARE_String(tmp_file_dir);
+
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
diff --git
a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
index 4ed3258d74d..1a0a3e9ab1d 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
@@ -399,7 +399,8 @@ void
DorisCompoundDirectory::FSIndexInput::readInternal(uint8_t* b, const int32_
void DorisCompoundDirectory::FSIndexOutput::init(const io::FileSystemSPtr&
fileSystem,
const char* path) {
- Status status = fileSystem->create_file(path, &_writer);
+ io::FileWriterOptions opts {.create_empty_file = false};
+ Status status = fileSystem->create_file(path, &_writer, &opts);
DBUG_EXECUTE_IF(
"DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_"
"init",
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
index 07bea0c83f3..9cd9897f6f5 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
@@ -32,11 +32,13 @@
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wshadow-field"
#endif
+
#include "CLucene/analysis/standard95/StandardAnalyzer.h"
+
#ifdef __clang__
#pragma clang diagnostic pop
#endif
-#include "common/config.h"
+
#include "olap/field.h"
#include "olap/inverted_index_parser.h"
#include "olap/key_coder.h"
@@ -50,6 +52,7 @@
#include "olap/tablet_schema.h"
#include "olap/types.h"
#include "runtime/collection_value.h"
+#include "runtime/exec_env.h"
#include "util/debug_points.h"
#include "util/faststring.h"
#include "util/slice.h"
@@ -209,8 +212,14 @@ public:
return Status::InternalError("init_fulltext_index directory
already exists");
}
+ auto tmp_file_dir =
ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
+ _lfs = io::global_local_filesystem();
+ auto lfs_index_path =
InvertedIndexDescriptor::get_temporary_index_path(
+ tmp_file_dir / _segment_file_name, _index_meta->index_id(),
+ _index_meta->get_index_suffix());
dir =
std::unique_ptr<DorisCompoundDirectory>(DorisCompoundDirectoryFactory::getDirectory(
- _fs, index_path.c_str(), use_compound_file_writer,
can_use_ram_dir));
+ _lfs, lfs_index_path.c_str(), use_compound_file_writer,
can_use_ram_dir, nullptr,
+ _fs, index_path.c_str()));
return Status::OK();
}
@@ -451,6 +460,7 @@ public:
}
return Status::OK();
}
+
Status add_array_values(size_t field_size, const CollectionValue* values,
size_t count) override {
if constexpr (field_is_slice_type(field_type)) {
@@ -620,6 +630,7 @@ private:
std::string _segment_file_name;
std::string _directory;
io::FileSystemSPtr _fs;
+ io::FileSystemSPtr _lfs;
const KeyCoder* _value_key_coder;
const TabletIndex* _index_meta;
InvertedIndexParserType _parser_type;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.h
b/be/src/olap/rowset/segment_v2/inverted_index_writer.h
index 44cc41789b5..fee81f8235a 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_writer.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.h
@@ -21,15 +21,22 @@
#include <stddef.h>
#include <stdint.h>
+#include <atomic>
#include <memory>
#include <string>
+#include "common/config.h"
#include "common/status.h"
+#include "gutil/strings/split.h"
#include "io/fs/file_system.h"
+#include "io/fs/local_file_system.h"
+#include "olap/options.h"
namespace doris {
class CollectionValue;
+
class Field;
+
class TabletIndex;
namespace segment_v2 {
@@ -66,5 +73,34 @@ private:
DISALLOW_COPY_AND_ASSIGN(InvertedIndexColumnWriter);
};
+class TmpFileDirs {
+public:
+ TmpFileDirs(const std::vector<doris::StorePath>& store_paths) {
+ for (const auto& store_path : store_paths) {
+ _tmp_file_dirs.emplace_back(store_path.path + "/" +
config::tmp_file_dir);
+ }
+ };
+
+ Status init() {
+ for (auto& tmp_file_dir : _tmp_file_dirs) {
+ bool exists = true;
+
RETURN_IF_ERROR(io::global_local_filesystem()->exists(tmp_file_dir, &exists));
+ if (!exists) {
+
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_file_dir));
+ }
+ }
+ return Status::OK();
+ };
+
+ io::Path get_tmp_file_dir() {
+ size_t cur_index = _next_index.fetch_add(1);
+ return _tmp_file_dirs[cur_index % _tmp_file_dirs.size()];
+ };
+
+private:
+ std::vector<io::Path> _tmp_file_dirs;
+ std::atomic_size_t _next_index {0}; // use for round-robin
+};
+
} // namespace segment_v2
} // namespace doris
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 171f4400ef2..895f895701c 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -33,6 +33,7 @@
#include "olap/memtable_memory_limiter.h"
#include "olap/olap_define.h"
#include "olap/options.h"
+#include "olap/rowset/segment_v2/inverted_index_writer.h"
#include "olap/tablet_fwd.h"
#include "pipeline/pipeline_tracing.h"
#include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove
this include header
@@ -58,6 +59,7 @@ class FileCacheFactory;
namespace segment_v2 {
class InvertedIndexSearcherCache;
class InvertedIndexQueryCache;
+class TmpFileDirs;
} // namespace segment_v2
class WorkloadSchedPolicyMgr;
@@ -272,6 +274,8 @@ public:
return _pipeline_tracer_ctx.get();
}
+ segment_v2::TmpFileDirs* get_tmp_file_dirs() { return
_tmp_file_dirs.get(); }
+
private:
ExecEnv();
@@ -385,6 +389,7 @@ private:
RuntimeQueryStatiticsMgr* _runtime_query_statistics_mgr = nullptr;
std::unique_ptr<pipeline::PipelineTracerContext> _pipeline_tracer_ctx;
+ std::unique_ptr<segment_v2::TmpFileDirs> _tmp_file_dirs;
};
template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 3098788cff2..0b441484f01 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -151,6 +151,8 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
}
init_doris_metrics(store_paths);
_store_paths = store_paths;
+ _tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(_store_paths);
+ RETURN_IF_ERROR(_tmp_file_dirs->init());
_user_function_cache = new UserFunctionCache();
static_cast<void>(_user_function_cache->init(doris::config::user_function_dir));
_external_scan_context_mgr = new ExternalScanContextMgr(this);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]