This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new a4a191fe56c [fix](index compaction)Fix MOW index compaction core
(#32121) (#32657)
a4a191fe56c is described below
commit a4a191fe56c9d2b5f0d46315f8079de82a4a65ac
Author: qiye <[email protected]>
AuthorDate: Fri Mar 22 14:20:19 2024 +0800
[fix](index compaction)Fix MOW index compaction core (#32121) (#32657)
---
be/src/clucene | 2 +-
be/src/common/config.cpp | 2 +
be/src/common/config.h | 2 +
be/src/index-tools/index_tool.cpp | 141 +++++++++++++++++++++++++++++++++++++-
be/src/olap/compaction.cpp | 42 ++++++++++++
be/src/olap/tablet_meta.cpp | 3 +-
6 files changed, 186 insertions(+), 6 deletions(-)
diff --git a/be/src/clucene b/be/src/clucene
index c5ba0a26e9c..ef95e67ae31 160000
--- a/be/src/clucene
+++ b/be/src/clucene
@@ -1 +1 @@
-Subproject commit c5ba0a26e9cab11a85dc3c5854e9ad258fa4fdf5
+Subproject commit ef95e67ae3123409f006072194f742a079603159
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 6bb6d52d834..ab7bd39e034 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1024,6 +1024,8 @@ DEFINE_Int32(inverted_index_read_buffer_size, "4096");
DEFINE_Int32(max_depth_in_bkd_tree, "32");
// index compaction
DEFINE_mBool(inverted_index_compaction_enable, "false");
+// Only for debug, do not use in production
+DEFINE_mBool(debug_inverted_index_compaction, "false");
// index by RAM directory
DEFINE_mBool(inverted_index_ram_dir_enable, "false");
// use num_broadcast_buffer blocks as buffer to do broadcast
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 85db11b9e61..4aaed552d89 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1061,6 +1061,8 @@ DECLARE_Int32(inverted_index_read_buffer_size);
DECLARE_Int32(max_depth_in_bkd_tree);
// index compaction
DECLARE_mBool(inverted_index_compaction_enable);
+// Only for debug, do not use in production
+DECLARE_mBool(debug_inverted_index_compaction);
// index by RAM directory
DECLARE_mBool(inverted_index_ram_dir_enable);
// use num_broadcast_buffer blocks as buffer to do broadcast
diff --git a/be/src/index-tools/index_tool.cpp
b/be/src/index-tools/index_tool.cpp
index 3f541caf680..62593c11785 100644
--- a/be/src/index-tools/index_tool.cpp
+++ b/be/src/index-tools/index_tool.cpp
@@ -25,6 +25,7 @@
#include <fstream>
#include <iostream>
#include <memory>
+#include <nlohmann/json.hpp>
#include <roaring/roaring.hh>
#include <sstream>
#include <string>
@@ -62,7 +63,8 @@ using namespace lucene::index;
using namespace lucene::util;
using namespace lucene::search;
-DEFINE_string(operation, "", "valid operation:
show_nested_files,check_terms,term_query");
+DEFINE_string(operation, "",
+ "valid operation:
show_nested_files,check_terms,term_query,debug_index_compaction");
DEFINE_string(directory, "./", "inverted index file directory");
DEFINE_string(idx_file_name, "", "inverted index file name");
@@ -72,19 +74,33 @@ DEFINE_string(term, "", "inverted index term to query");
DEFINE_string(column_name, "", "inverted index column_name to query");
DEFINE_string(pred_type, "", "inverted index term query predicate,
eq/lt/gt/le/ge/match etc.");
DEFINE_bool(print_row_id, false, "print row id when query terms");
+DEFINE_bool(print_doc_id, false, "print doc id when check terms stats");
+// only for debug index compaction
+DEFINE_int32(idx_id, -1, "inverted index id");
+DEFINE_string(src_idx_dirs_file, "", "source segment index files");
+DEFINE_string(dest_idx_dirs_file, "", "destination segment index files");
+DEFINE_string(dest_seg_num_rows_file, "", "destination segment number of
rows");
+DEFINE_string(tablet_path, "", "tablet path");
+DEFINE_string(trans_vec_file, "", "rowid conversion map file");
std::string get_usage(const std::string& progname) {
std::stringstream ss;
ss << progname << " is the Doris inverted index file tool.\n";
- ss << "Stop BE first before use this tool.\n";
ss << "Usage:\n";
ss << "./index_tool --operation=show_nested_files
--idx_file_path=path/to/file\n";
- ss << "./index_tool --operation=check_terms_stats
--idx_file_path=path/to/file\n";
+ ss << "./index_tool --operation=check_terms_stats
--idx_file_path=path/to/file "
+ "--print_doc_id\n";
ss << "./index_tool --operation=term_query --directory=directory "
"--idx_file_name=file --print_row_id --term=term
--column_name=column_name "
"--pred_type=eq/lt/gt/le/ge/match etc\n";
ss << "./index_tool --operation=write_index_v2
--idx_file_path=path/to/index "
"--data_file_path=data/to/index\n";
+ ss << "*** debug_index_compaction operation is only for offline debug
index compaction, do not "
+ "use in production ***\n";
+ ss << "./index_tool --operation=debug_index_compaction --idx_id=index_id "
+ "--src_idx_dirs_file=path/to/file --dest_idx_dirs_file=path/to/file "
+ "--dest_seg_num_rows_file=path/to/file --tablet_path=path/to/tablet "
+ "--trans_vec_file=path/to/file\n";
return ss.str();
}
@@ -192,6 +208,14 @@ void check_terms_stats(lucene::store::Directory* dir) {
printf("Term: %s ", token.c_str());
printf("Freq: %d\n", te->docFreq());
+ if (FLAGS_print_doc_id) {
+ TermDocs* td = r->termDocs(te->term());
+ while (td->next()) {
+ printf("DocID: %d ", td->doc());
+ printf("TermFreq: %d\n", td->freq());
+ }
+ _CLLDELETE(td);
+ }
}
printf("Term count: %d\n\n", nterms);
te->close();
@@ -351,6 +375,117 @@ int main(int argc, char** argv) {
}
return -1;
}
+ } else if (FLAGS_operation == "debug_index_compaction") {
+ // only for debug index compaction, do not use in production
+ if (FLAGS_idx_id <= 0 || FLAGS_src_idx_dirs_file == "" ||
FLAGS_dest_idx_dirs_file == "" ||
+ FLAGS_dest_seg_num_rows_file == "" || FLAGS_tablet_path == "" ||
+ FLAGS_trans_vec_file == "") {
+ std::cout << "invalid params for debug_index_compaction " <<
std::endl;
+ return -1;
+ }
+
+ auto fs = doris::io::global_local_filesystem();
+
+ auto read_file_to_json = [&](const std::string& file, std::string&
output) {
+ doris::io::FileReaderSPtr file_reader;
+ doris::Status status = fs->open_file(file, &file_reader);
+ if (!status.ok()) {
+ std::cout << "read file " << file << " failed" << std::endl;
+ return false;
+ }
+ size_t fsize = file_reader->size();
+ if (fsize > 0) {
+ output.resize(fsize);
+ size_t bytes_read = 0;
+ status = file_reader->read_at(0, {output.data(), fsize},
&bytes_read);
+ }
+ if (!status.ok()) {
+ std::cout << "read file " << file << " failed" << std::endl;
+ return false;
+ }
+ return true;
+ };
+
+ int32_t index_id = FLAGS_idx_id;
+ std::string tablet_path = FLAGS_tablet_path;
+ std::string src_index_dirs_string;
+ std::string dest_index_dirs_string;
+ std::string dest_segment_num_rows_string;
+ std::string trans_vec_string;
+
+ if (!read_file_to_json(FLAGS_src_idx_dirs_file, src_index_dirs_string)
||
+ !read_file_to_json(FLAGS_dest_idx_dirs_file,
dest_index_dirs_string) ||
+ !read_file_to_json(FLAGS_dest_seg_num_rows_file,
dest_segment_num_rows_string) ||
+ !read_file_to_json(FLAGS_trans_vec_file, trans_vec_string)) {
+ return -1;
+ }
+ std::vector<std::string> src_index_files =
nlohmann::json::parse(src_index_dirs_string);
+ std::vector<std::string> dest_index_files =
nlohmann::json::parse(dest_index_dirs_string);
+ std::vector<uint32_t> dest_segment_num_rows =
+ nlohmann::json::parse(dest_segment_num_rows_string);
+ std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec =
+ nlohmann::json::parse(trans_vec_string);
+ int src_segment_num = src_index_files.size();
+ int dest_segment_num = dest_index_files.size();
+
+ std::string index_writer_path = tablet_path + "/tmp_index_writer";
+ lucene::store::Directory* dir =
+ DorisCompoundDirectoryFactory::getDirectory(fs,
index_writer_path.c_str(), false);
+ lucene::analysis::SimpleAnalyzer<char> analyzer;
+ auto index_writer = _CLNEW lucene::index::IndexWriter(dir, &analyzer,
true /* create */,
+ true /*
closeDirOnShutdown */);
+ std::ostream* infoStream = &std::cout;
+ index_writer->setInfoStream(infoStream);
+ // get compound directory src_index_dirs
+ std::vector<lucene::store::Directory*> src_index_dirs(src_segment_num);
+ for (int i = 0; i < src_segment_num; ++i) {
+ // format: rowsetId_segmentId_indexId.idx
+ std::string src_idx_full_name =
+ src_index_files[i] + "_" + std::to_string(index_id) +
".idx";
+ DorisCompoundReader* reader = new DorisCompoundReader(
+ DorisCompoundDirectoryFactory::getDirectory(fs,
tablet_path.c_str()),
+ src_idx_full_name.c_str());
+ src_index_dirs[i] = reader;
+ }
+
+ // get dest idx file paths
+ std::vector<lucene::store::Directory*>
dest_index_dirs(dest_segment_num);
+ for (int i = 0; i < dest_segment_num; ++i) {
+ // format: rowsetId_segmentId_columnId
+ auto path = tablet_path + "/" + dest_index_files[i] + "_" +
std::to_string(index_id);
+ dest_index_dirs[i] =
+ DorisCompoundDirectoryFactory::getDirectory(fs,
path.c_str(), true);
+ }
+
+ index_writer->indexCompaction(src_index_dirs, dest_index_dirs,
trans_vec,
+ dest_segment_num_rows);
+
+ index_writer->close();
+ _CLDELETE(index_writer);
+ // NOTE: need to ref_cnt-- for dir,
+ // when index_writer is destroyed, if closeDir is set, dir will be
close
+ // _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1,
dir will be destroyed.
+ _CLDECDELETE(dir)
+ for (auto d : src_index_dirs) {
+ if (d != nullptr) {
+ d->close();
+ _CLDELETE(d);
+ }
+ }
+ for (auto d : dest_index_dirs) {
+ if (d != nullptr) {
+ // NOTE: DO NOT close dest dir here, because it will be closed
when dest index writer finalize.
+ //d->close();
+ _CLDELETE(d);
+ }
+ }
+
+ // delete temporary index_writer_path
+ if (!fs->delete_directory(index_writer_path.c_str()).ok()) {
+ std::cout << "delete temporary index writer path: " <<
index_writer_path << " failed."
+ << std::endl;
+ return -1;
+ }
} else if (FLAGS_operation == "write_index_v2") {
if (FLAGS_idx_file_path == "") {
std::cout << "no index path flag for check " << std::endl;
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index c4dd3cf31c5..e852344688c 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -27,6 +27,7 @@
#include <map>
#include <memory>
#include <mutex>
+#include <nlohmann/json.hpp>
#include <numeric>
#include <ostream>
#include <set>
@@ -36,6 +37,7 @@
#include "common/config.h"
#include "common/status.h"
#include "io/fs/file_system.h"
+#include "io/fs/file_writer.h"
#include "io/fs/remote_file_system.h"
#include "olap/cumulative_compaction_policy.h"
#include "olap/cumulative_compaction_time_series_policy.h"
@@ -471,6 +473,46 @@ Status Compaction::do_compaction_impl(int64_t permits) {
dest_index_files[i] = prefix;
}
+ // Only write info files when debug index compaction is enabled.
+ // The files are used to debug index compaction and works with
index_tool.
+ if (config::debug_inverted_index_compaction) {
+ auto write_json_to_file = [&](const nlohmann::json& json_obj,
+ const std::string& file_name) {
+ io::FileWriterPtr file_writer;
+ std::string file_path =
+ fmt::format("{}/{}.json", config::sys_log_dir,
file_name);
+ RETURN_IF_ERROR(
+
io::global_local_filesystem()->create_file(file_path, &file_writer));
+ RETURN_IF_ERROR(file_writer->append(json_obj.dump()));
+ RETURN_IF_ERROR(file_writer->append("\n"));
+ return file_writer->close();
+ };
+
+ // Convert trans_vec to JSON and print it
+ nlohmann::json trans_vec_json = trans_vec;
+ auto output_version = _output_version.to_string().substr(
+ 1, _output_version.to_string().size() - 2);
+ RETURN_IF_ERROR(write_json_to_file(
+ trans_vec_json,
+ fmt::format("trans_vec_{}_{}", _tablet->tablet_id(),
output_version)));
+
+ nlohmann::json src_index_files_json = src_index_files;
+ RETURN_IF_ERROR(write_json_to_file(
+ src_index_files_json,
+ fmt::format("src_idx_dirs_{}_{}",
_tablet->tablet_id(), output_version)));
+
+ nlohmann::json dest_index_files_json = dest_index_files;
+ RETURN_IF_ERROR(write_json_to_file(
+ dest_index_files_json,
+ fmt::format("dest_idx_dirs_{}_{}",
_tablet->tablet_id(), output_version)));
+
+ nlohmann::json dest_segment_num_rows_json =
dest_segment_num_rows;
+ RETURN_IF_ERROR(
+ write_json_to_file(dest_segment_num_rows_json,
+
fmt::format("dest_seg_num_rows_{}_{}",
+ _tablet->tablet_id(),
output_version)));
+ }
+
// create index_writer to compaction indexes
const auto& fs = _output_rowset->rowset_meta()->fs();
const auto& tablet_path = _tablet->tablet_path();
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 23f4428f747..cc9b9dc0b9c 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -71,8 +71,7 @@ TabletMetaSharedPtr TabletMeta::create(
request.time_series_compaction_file_count_threshold,
request.time_series_compaction_time_threshold_seconds,
request.time_series_compaction_empty_rowsets_threshold,
- request.inverted_index_storage_format,
- request.time_series_compaction_level_threshold);
+ request.inverted_index_storage_format,
request.time_series_compaction_level_threshold);
}
TabletMeta::TabletMeta()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]