This is an automated email from the ASF dual-hosted git repository.
jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e9a14d700e8 [opt](index compaction)Add dual write inverted index file
switch (#42280)
e9a14d700e8 is described below
commit e9a14d700e87a995a1d4cd84aa3457b61997ece8
Author: qiye <[email protected]>
AuthorDate: Fri Oct 25 11:35:18 2024 +0800
[opt](index compaction)Add dual write inverted index file switch (#42280)
## Proposed changes
To check correctness of index file produced by index compaction process,
we add a switch `dual_write_inverted_index_enable` (default is `false`).
When both `inverted_index_compaction_enable` and
`dual_write_inverted_index_enable` are `true`, Doris will produce index
file through both normal compaction process and index compaction
process, and compares both index files, which are theoretically
identical. Doris will log FATAL and crash after the check failed.
We add this feature only for test, *DO NOT* use it in production.
---
be/src/common/config.cpp | 2 +
be/src/common/config.h | 4 +
be/src/olap/compaction.cpp | 151 ++++++++++++++++++++-
be/src/olap/compaction.h | 3 +
be/src/olap/rowset/segment_v2/segment_writer.cpp | 3 +-
.../rowset/segment_v2/vertical_segment_writer.cpp | 3 +-
6 files changed, 163 insertions(+), 3 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 31170b731f4..d031189141e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1042,6 +1042,8 @@ DEFINE_Int32(max_depth_in_bkd_tree, "32");
DEFINE_mBool(inverted_index_compaction_enable, "false");
// Only for debug, do not use in production
DEFINE_mBool(debug_inverted_index_compaction, "false");
+// Only for debug, do not use in production
+DEFINE_mBool(dual_write_inverted_index_enable, "false");
// index by RAM directory
DEFINE_mBool(inverted_index_ram_dir_enable, "true");
// use num_broadcast_buffer blocks as buffer to do broadcast
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 585c4dc45cc..d5daf0c6924 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1090,7 +1090,11 @@ DECLARE_Int32(max_depth_in_bkd_tree);
// index compaction
DECLARE_mBool(inverted_index_compaction_enable);
// Only for debug, do not use in production
+// Debug switch for collecting intermediate data in inverted index compaction
DECLARE_mBool(debug_inverted_index_compaction);
+// Only for debug, do not use in production
+// Debug switch for writing inverted index both in compaction process and
index compaction process
+DECLARE_mBool(dual_write_inverted_index_enable);
// index by RAM directory
DECLARE_mBool(inverted_index_ram_dir_enable);
// use num_broadcast_buffer blocks as buffer to do broadcast
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index dee06a8a79b..d23510d373d 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -18,6 +18,7 @@
#include "olap/compaction.h"
#include <fmt/format.h>
+#include <gen_cpp/olap_common.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>
@@ -35,6 +36,8 @@
#include <shared_mutex>
#include <utility>
+#include "CLucene/config/repl_wchar.h"
+#include "CLucene/index/Terms.h"
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "common/config.h"
@@ -619,6 +622,7 @@ Status Compaction::do_inverted_index_compaction() {
// Some columns have already been indexed
// key: seg_id, value: inverted index file size
std::unordered_map<int, int64_t> compacted_idx_file_size;
+ auto tmp_file_dir =
ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
for (int seg_id = 0; seg_id < dest_segment_num; ++seg_id) {
std::string index_path_prefix {
InvertedIndexDescriptor::get_index_file_path_prefix(ctx.segment_path(seg_id))};
@@ -644,6 +648,12 @@ Status Compaction::do_inverted_index_compaction() {
}
compacted_idx_file_size[seg_id] = fsize;
}
+ // if dual_write_inverted_index_enable is true, we need to write
inverted index to tmp dir
+ if (config::dual_write_inverted_index_enable) {
+ auto tmp_index_path_prefix =
+ tmp_file_dir / (dest_rowset_id.to_string() + "_" +
std::to_string(seg_id));
+ index_path_prefix = tmp_index_path_prefix;
+ }
auto inverted_index_file_writer =
std::make_unique<InvertedIndexFileWriter>(
ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(),
seg_id,
_cur_tablet_schema->get_inverted_index_storage_format());
@@ -667,7 +677,6 @@ Status Compaction::do_inverted_index_compaction() {
}
// use tmp file dir to store index files
- auto tmp_file_dir =
ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
auto index_tmp_path = tmp_file_dir / dest_rowset_id.to_string();
LOG(INFO) << "start index compaction"
<< ". tablet=" << _tablet->tablet_id() << ", source index size="
<< src_segment_num
@@ -754,6 +763,70 @@ Status Compaction::do_inverted_index_compaction() {
return status;
}
+ // check idx file correctness only when dual_write_inverted_index_enable
is true
+ if (config::dual_write_inverted_index_enable) {
+ for (auto&& column_uniq_id : ctx.columns_to_do_index_compaction) {
+ auto col = _cur_tablet_schema->column_by_uid(column_uniq_id);
+ const auto* index_meta =
_cur_tablet_schema->get_inverted_index(col);
+ for (int dest_segment_id = 0; dest_segment_id < dest_segment_num;
dest_segment_id++) {
+ // create index file reader for normal compaction index file
+ std::string index_path_prefix
{InvertedIndexDescriptor::get_index_file_path_prefix(
+ ctx.segment_path(dest_segment_id))};
+ io::Path cfs_path;
+ if (_cur_tablet_schema->get_inverted_index_storage_format() !=
+ doris::InvertedIndexStorageFormatPB::V1) {
+ cfs_path =
InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix);
+ } else {
+ cfs_path = InvertedIndexDescriptor::get_index_file_path_v1(
+ index_path_prefix, index_meta->index_id(),
+ index_meta->get_index_suffix());
+ }
+ auto inverted_index_file_reader =
std::make_unique<InvertedIndexFileReader>(
+ ctx.fs(), index_path_prefix,
+
_cur_tablet_schema->get_inverted_index_storage_format());
+ bool open_idx_file_cache = false;
+ auto st =
inverted_index_file_reader->init(config::inverted_index_read_buffer_size,
+
open_idx_file_cache);
+ if (!st.ok()) {
+ LOG(FATAL) << "inverted_index_file_reader init failed in
index compaction "
+ "correctness check, error:"
+ << st;
+ }
+ auto index_reader =
DORIS_TRY(inverted_index_file_reader->open(index_meta));
+
+ // create index file reader for tmp index compaction index file
+ auto tmp_index_path_prefix = tmp_file_dir /
(dest_rowset_id.to_string() + "_" +
+
std::to_string(dest_segment_id));
+ auto tmp_inverted_index_file_reader =
std::make_unique<InvertedIndexFileReader>(
+ doris::io::global_local_filesystem(),
tmp_index_path_prefix,
+
_cur_tablet_schema->get_inverted_index_storage_format());
+ st =
tmp_inverted_index_file_reader->init(config::inverted_index_read_buffer_size,
+ open_idx_file_cache);
+ if (!st.ok()) {
+ LOG(FATAL) << "tmp_inverted_index_file_reader init failed
in index compaction "
+ "correctness check, error:"
+ << st;
+ }
+ auto tmp_index_reader =
DORIS_TRY(tmp_inverted_index_file_reader->open(index_meta));
+
+ st = check_idx_file_correctness(*index_reader,
*tmp_index_reader);
+ if (!st.ok()) {
+ LOG(FATAL) << "index compaction correctness check failed"
+ << ", tablet=" << _tablet->tablet_id() << ",
index_path=" << cfs_path
+ << ", tmp_index_path="
+ << (tmp_index_path_prefix.string() + "_" +
+ std::to_string(index_meta->index_id()) +
".idx")
+ << ", error=" << st.msg();
+ }
+ LOG(INFO) << "index compaction correctness check succeed"
+ << ", tablet=" << _tablet->tablet_id() << ",
index_path=" << cfs_path
+ << ", tmp_index_path="
+ << (tmp_index_path_prefix.string() + "_" +
+ std::to_string(index_meta->index_id()) + ".idx");
+ }
+ }
+ }
+
// index compaction should update total disk size and index disk size
_output_rowset->rowset_meta()->set_data_disk_size(_output_rowset->data_disk_size()
+
inverted_index_file_size);
@@ -776,6 +849,82 @@ Status Compaction::do_inverted_index_compaction() {
return Status::OK();
}
+Status Compaction::check_idx_file_correctness(DorisCompoundReader&
index_reader,
+ DorisCompoundReader&
tmp_index_reader) {
+ lucene::index::IndexReader* idx_reader =
lucene::index::IndexReader::open(&index_reader);
+ lucene::index::IndexReader* tmp_idx_reader =
+ lucene::index::IndexReader::open(&tmp_index_reader);
+
+ // compare numDocs
+ if (idx_reader->numDocs() != tmp_idx_reader->numDocs()) {
+ return Status::InternalError(
+ "index compaction correctness check failed, numDocs not equal,
idx_numDocs={}, "
+ "tmp_idx_numDocs={}",
+ idx_reader->numDocs(), tmp_idx_reader->numDocs());
+ }
+
+ lucene::index::TermEnum* term_enum = idx_reader->terms();
+ lucene::index::TermEnum* tmp_term_enum = tmp_idx_reader->terms();
+
+ // iterate TermEnum
+ while (term_enum->next() && tmp_term_enum->next()) {
+ std::string token =
lucene_wcstoutf8string(term_enum->term(false)->text(),
+
term_enum->term(false)->textLength());
+ std::string field =
lucene_wcstoutf8string(term_enum->term(false)->field(),
+
lenOfString(term_enum->term(false)->field()));
+ std::string tmp_token =
lucene_wcstoutf8string(tmp_term_enum->term(false)->text(),
+
tmp_term_enum->term(false)->textLength());
+ std::string tmp_field =
+ lucene_wcstoutf8string(tmp_term_enum->term(false)->field(),
+
lenOfString(tmp_term_enum->term(false)->field()));
+ // compare token and field
+ if (field != tmp_field) {
+ return Status::InternalError(
+ "index compaction correctness check failed, fields not
equal, field={}, "
+ "tmp_field={}",
+ field, field);
+ }
+ if (token != tmp_token) {
+ return Status::InternalError(
+ "index compaction correctness check failed, tokens not
equal, token={}, "
+ "tmp_token={}",
+ token, tmp_token);
+ }
+
+ // get term's docId and freq
+ lucene::index::TermDocs* term_docs =
idx_reader->termDocs(term_enum->term());
+ lucene::index::TermDocs* tmp_term_docs =
tmp_idx_reader->termDocs(tmp_term_enum->term());
+
+ // compare term's docId and freq
+ while (term_docs->next() && tmp_term_docs->next()) {
+ if (term_docs->doc() != tmp_term_docs->doc() ||
+ term_docs->freq() != tmp_term_docs->freq()) {
+ return Status::InternalError(
+ "index compaction correctness check failed, docId or
freq not equal, "
+ "docId={}, tmp_docId={}, freq={}, tmp_freq={}",
+ term_docs->doc(), tmp_term_docs->doc(),
term_docs->freq(),
+ tmp_term_docs->freq());
+ }
+ }
+
+ // check if there are remaining docs
+ if (term_docs->next() || tmp_term_docs->next()) {
+ return Status::InternalError(
+ "index compaction correctness check failed, number of docs
not equal for "
+ "term={}, tmp_term={}",
+ token, tmp_token);
+ }
+ }
+
+ // check if there are remaining terms
+ if (term_enum->next() || tmp_term_enum->next()) {
+ return Status::InternalError(
+ "index compaction correctness check failed, number of terms
not equal");
+ }
+
+ return Status::OK();
+}
+
void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
for (const auto& index : _cur_tablet_schema->indexes()) {
if (index.index_type() != IndexType::INVERTED) {
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 08afe840280..99de5573a91 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -69,6 +69,9 @@ protected:
Status do_inverted_index_compaction();
+ Status check_idx_file_correctness(DorisCompoundReader& index_reader,
+ DorisCompoundReader& tmp_index_reader);
+
void construct_index_compaction_columns(RowsetWriterContext& ctx);
virtual Status construct_output_rowset_writer(RowsetWriterContext& ctx) =
0;
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 4301303dac9..f18f5d5d641 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -208,7 +208,8 @@ Status SegmentWriter::_create_column_writer(uint32_t cid,
const TabletColumn& co
opts.need_bitmap_index = column.has_bitmap_index();
bool skip_inverted_index = false;
- if (_opts.rowset_ctx != nullptr) {
+ // if dual_write_inverted_index_enable is true, do not skip write inverted
index on index compaction columns
+ if (_opts.rowset_ctx != nullptr &&
!config::dual_write_inverted_index_enable) {
// skip write inverted index for index compaction column
skip_inverted_index =
_opts.rowset_ctx->columns_to_do_index_compaction.count(column.unique_id()) > 0;
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index ce033cdd002..ff9c694d1b4 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -201,7 +201,8 @@ Status
VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo
opts.need_bitmap_index = column.has_bitmap_index();
bool skip_inverted_index = false;
- if (_opts.rowset_ctx != nullptr) {
+ // if dual_write_inverted_index_enable is true, do not skip write inverted
index on index compaction columns
+ if (_opts.rowset_ctx != nullptr &&
!config::dual_write_inverted_index_enable) {
// skip write inverted index for index compaction column
skip_inverted_index =
_opts.rowset_ctx->columns_to_do_index_compaction.contains(column.unique_id());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]