This is an automated email from the ASF dual-hosted git repository.

jianliangqi pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 96493eb368f [fix](index compaction)Fix MOW index compaction core 
(#32085)
96493eb368f is described below

commit 96493eb368f39a1bae68b1b22042c41875cd3cad
Author: qiye <[email protected]>
AuthorDate: Tue Mar 12 16:54:52 2024 +0800

    [fix](index compaction)Fix MOW index compaction core (#32085)
    
    In MOW table with inverted index, when the table has deleted docs,
    the doc_id in rowid_conversion_map will be INT32_MAX.
    index_compact in CLucene is not handling it correctly.
    It will cause doc lost in specific terms and the postings will be incorrect.
    The index compaction process will be fail.
    
    Here are the changes:
    1. Remove INT32_MAX out of destPostingsQueues in CLucene to handle deleted 
doc right.
    2. Add debug code and switch for index compaction in Doris
    3. Add debug_index_compaction operation in index_tool
---
 be/src/clucene                    |   2 +-
 be/src/common/config.cpp          |   2 +
 be/src/common/config.h            |   2 +
 be/src/index-tools/index_tool.cpp | 126 +++++++++++++++++++++++++++++++++++++-
 be/src/olap/compaction.cpp        |  42 +++++++++++++
 5 files changed, 170 insertions(+), 4 deletions(-)

diff --git a/be/src/clucene b/be/src/clucene
index 5e4b4ca1b4e..e9c7f1f9a4a 160000
--- a/be/src/clucene
+++ b/be/src/clucene
@@ -1 +1 @@
-Subproject commit 5e4b4ca1b4e307dc8d1f2415a98ae7286d956051
+Subproject commit e9c7f1f9a4a324d418eab978fa7ccbcf0878f60c
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 1619cdaf2b4..f523649f649 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1008,6 +1008,8 @@ DEFINE_Int32(inverted_index_read_buffer_size, "4096");
 DEFINE_Int32(max_depth_in_bkd_tree, "32");
 // index compaction
 DEFINE_mBool(inverted_index_compaction_enable, "false");
+// Only for debug, do not use in production
+DEFINE_mBool(debug_inverted_index_compaction, "false");
 // index by RAM directory
 DEFINE_mBool(inverted_index_ram_dir_enable, "false");
 // use num_broadcast_buffer blocks as buffer to do broadcast
diff --git a/be/src/common/config.h b/be/src/common/config.h
index a46ef7ed646..b45c03ed2b9 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1042,6 +1042,8 @@ DECLARE_mInt32(inverted_index_max_buffered_docs);
 DECLARE_Int32(max_depth_in_bkd_tree);
 // index compaction
 DECLARE_mBool(inverted_index_compaction_enable);
+// Only for debug, do not use in production
+DECLARE_mBool(debug_inverted_index_compaction);
 // index by RAM directory
 DECLARE_mBool(inverted_index_ram_dir_enable);
 // use num_broadcast_buffer blocks as buffer to do broadcast
diff --git a/be/src/index-tools/index_tool.cpp 
b/be/src/index-tools/index_tool.cpp
index a5ecf9b996f..457560fdf05 100644
--- a/be/src/index-tools/index_tool.cpp
+++ b/be/src/index-tools/index_tool.cpp
@@ -20,7 +20,9 @@
 #include <gflags/gflags.h>
 
 #include <filesystem>
+#include <fstream>
 #include <iostream>
+#include <nlohmann/json.hpp>
 #include <sstream>
 #include <string>
 #include <vector>
@@ -37,7 +39,8 @@ using namespace lucene::index;
 using namespace lucene::util;
 using namespace lucene::search;
 
-DEFINE_string(operation, "", "valid operation: 
show_nested_files,check_terms,term_query");
+DEFINE_string(operation, "",
+              "valid operation: 
show_nested_files,check_terms,term_query,debug_index_compaction");
 
 DEFINE_string(directory, "./", "inverted index file directory");
 DEFINE_string(idx_file_name, "", "inverted index file name");
@@ -46,17 +49,31 @@ DEFINE_string(term, "", "inverted index term to query");
 DEFINE_string(column_name, "", "inverted index column_name to query");
 DEFINE_string(pred_type, "", "inverted index term query predicate, 
eq/lt/gt/le/ge/match etc.");
 DEFINE_bool(print_row_id, false, "print row id when query terms");
+DEFINE_bool(print_doc_id, false, "print doc id when check terms stats");
+// only for debug index compaction
+DEFINE_int32(idx_id, -1, "inverted index id");
+DEFINE_string(src_idx_dirs_file, "", "source segment index files");
+DEFINE_string(dest_idx_dirs_file, "", "destination segment index files");
+DEFINE_string(dest_seg_num_rows_file, "", "destination segment number of 
rows");
+DEFINE_string(tablet_path, "", "tablet path");
+DEFINE_string(trans_vec_file, "", "rowid conversion map file");
 
 std::string get_usage(const std::string& progname) {
     std::stringstream ss;
     ss << progname << " is the Doris inverted index file tool.\n";
-    ss << "Stop BE first before use this tool.\n";
     ss << "Usage:\n";
     ss << "./index_tool --operation=show_nested_files 
--idx_file_path=path/to/file\n";
-    ss << "./index_tool --operation=check_terms_stats 
--idx_file_path=path/to/file\n";
+    ss << "./index_tool --operation=check_terms_stats 
--idx_file_path=path/to/file "
+          "--print_doc_id\n";
     ss << "./index_tool --operation=term_query --directory=directory "
           "--idx_file_name=file --print_row_id --term=term 
--column_name=column_name "
           "--pred_type=eq/lt/gt/le/ge/match etc\n";
+    ss << "*** debug_index_compaction operation is only for offline debug 
index compaction, do not "
+          "use in production ***\n";
+    ss << "./index_tool --operation=debug_index_compaction --idx_id=index_id "
+          "--src_idx_dirs_file=path/to/file --dest_idx_dirs_file=path/to/file "
+          "--dest_seg_num_rows_file=path/to/file --tablet_path=path/to/tablet "
+          "--trans_vec_file=path/to/file\n";
     return ss.str();
 }
 
@@ -126,6 +143,14 @@ void check_terms_stats(lucene::store::Directory* dir) {
 
         printf("Term: %s ", token.c_str());
         printf("Freq: %d\n", te->docFreq());
+        if (FLAGS_print_doc_id) {
+            TermDocs* td = r->termDocs(te->term());
+            while (td->next()) {
+                printf("DocID: %d ", td->doc());
+                printf("TermFreq: %d\n", td->freq());
+            }
+            _CLLDELETE(td);
+        }
     }
     printf("Term count: %d\n\n", nterms);
     _CLLDELETE(te);
@@ -232,6 +257,101 @@ int main(int argc, char** argv) {
         } catch (CLuceneError& err) {
             std::cerr << "error occurred when check_terms_stats: " << 
err.what() << std::endl;
         }
+    } else if (FLAGS_operation == "debug_index_compaction") {
+        // only for debug index compaction, do not use in production
+        if (FLAGS_idx_id <= 0 || FLAGS_src_idx_dirs_file == "" || 
FLAGS_dest_idx_dirs_file == "" ||
+            FLAGS_dest_seg_num_rows_file == "" || FLAGS_tablet_path == "" ||
+            FLAGS_trans_vec_file == "") {
+            std::cout << "invalid params for debug_index_compaction " << 
std::endl;
+            return -1;
+        }
+
+        auto fs = doris::io::global_local_filesystem();
+
+        auto read_file_to_json = [&](const std::string& file, std::string& 
output) {
+            if (!fs->read_file_to_string(file, &output).ok()) {
+                std::cout << "read file " << file << " failed" << std::endl;
+                return false;
+            }
+            return true;
+        };
+
+        int32_t index_id = FLAGS_idx_id;
+        std::string tablet_path = FLAGS_tablet_path;
+        std::string src_index_dirs_string;
+        std::string dest_index_dirs_string;
+        std::string dest_segment_num_rows_string;
+        std::string trans_vec_string;
+
+        if (!read_file_to_json(FLAGS_src_idx_dirs_file, src_index_dirs_string) 
||
+            !read_file_to_json(FLAGS_dest_idx_dirs_file, 
dest_index_dirs_string) ||
+            !read_file_to_json(FLAGS_dest_seg_num_rows_file, 
dest_segment_num_rows_string) ||
+            !read_file_to_json(FLAGS_trans_vec_file, trans_vec_string)) {
+            return -1;
+        }
+        std::vector<std::string> src_index_files = 
nlohmann::json::parse(src_index_dirs_string);
+        std::vector<std::string> dest_index_files = 
nlohmann::json::parse(dest_index_dirs_string);
+        std::vector<uint32_t> dest_segment_num_rows =
+                nlohmann::json::parse(dest_segment_num_rows_string);
+        std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec =
+                nlohmann::json::parse(trans_vec_string);
+        int src_segment_num = src_index_files.size();
+        int dest_segment_num = dest_index_files.size();
+
+        std::string index_writer_path = tablet_path + "/tmp_index_writer";
+        lucene::store::Directory* dir =
+                DorisCompoundDirectoryFactory::getDirectory(fs, 
index_writer_path.c_str(), false);
+        lucene::analysis::SimpleAnalyzer<char> analyzer;
+        auto index_writer = _CLNEW lucene::index::IndexWriter(dir, &analyzer, 
true /* create */,
+                                                              true /* 
closeDirOnShutdown */);
+        std::ostream* infoStream = &std::cout;
+        index_writer->setInfoStream(infoStream);
+        // get compound directory src_index_dirs
+        std::vector<lucene::store::Directory*> src_index_dirs(src_segment_num);
+        for (int i = 0; i < src_segment_num; ++i) {
+            // format: rowsetId_segmentId_indexId.idx
+            std::string src_idx_full_name =
+                    src_index_files[i] + "_" + std::to_string(index_id) + 
".idx";
+            DorisCompoundReader* reader = new DorisCompoundReader(
+                    DorisCompoundDirectoryFactory::getDirectory(fs, 
tablet_path.c_str()),
+                    src_idx_full_name.c_str());
+            src_index_dirs[i] = reader;
+        }
+
+        // get dest idx file paths
+        std::vector<lucene::store::Directory*> 
dest_index_dirs(dest_segment_num);
+        for (int i = 0; i < dest_segment_num; ++i) {
+            // format: rowsetId_segmentId_columnId
+            auto path = tablet_path + "/" + dest_index_files[i] + "_" + 
std::to_string(index_id);
+            dest_index_dirs[i] =
+                    DorisCompoundDirectoryFactory::getDirectory(fs, 
path.c_str(), true);
+        }
+
+        index_writer->indexCompaction(src_index_dirs, dest_index_dirs, 
trans_vec,
+                                      dest_segment_num_rows);
+
+        index_writer->close();
+        _CLDELETE(index_writer);
+        // NOTE: need to ref_cnt-- for dir,
+        // when index_writer is destroyed, if closeDir is set, dir will be 
close
+        // _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, 
dir will be destroyed.
+        _CLDECDELETE(dir)
+        for (auto d : src_index_dirs) {
+            if (d != nullptr) {
+                d->close();
+                _CLDELETE(d);
+            }
+        }
+        for (auto d : dest_index_dirs) {
+            if (d != nullptr) {
+                // NOTE: DO NOT close dest dir here, because it will be closed 
when dest index writer finalize.
+                //d->close();
+                _CLDELETE(d);
+            }
+        }
+
+        // delete temporary index_writer_path
+        fs->delete_directory(index_writer_path.c_str());
     } else {
         std::cout << "invalid operation: " << FLAGS_operation << "\n" << usage 
<< std::endl;
         return -1;
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 4b6bef0c8e4..fabe83f0186 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -27,6 +27,7 @@
 #include <map>
 #include <memory>
 #include <mutex>
+#include <nlohmann/json.hpp>
 #include <numeric>
 #include <ostream>
 #include <set>
@@ -36,6 +37,7 @@
 #include "common/config.h"
 #include "common/status.h"
 #include "io/fs/file_system.h"
+#include "io/fs/file_writer.h"
 #include "io/fs/remote_file_system.h"
 #include "olap/cumulative_compaction_policy.h"
 #include "olap/cumulative_compaction_time_series_policy.h"
@@ -472,6 +474,46 @@ Status Compaction::do_compaction_impl(int64_t permits) {
                 dest_index_files[i] = prefix;
             }
 
+            // Only write info files when debug index compaction is enabled.
+            // The files are used to debug index compaction and works with 
index_tool.
+            if (config::debug_inverted_index_compaction) {
+                auto write_json_to_file = [&](const nlohmann::json& json_obj,
+                                              const std::string& file_name) {
+                    io::FileWriterPtr file_writer;
+                    std::string file_path =
+                            fmt::format("{}/{}.json", config::sys_log_dir, 
file_name);
+                    RETURN_IF_ERROR(
+                            
io::global_local_filesystem()->create_file(file_path, &file_writer));
+                    RETURN_IF_ERROR(file_writer->append(json_obj.dump()));
+                    RETURN_IF_ERROR(file_writer->append("\n"));
+                    return file_writer->close();
+                };
+
+                // Convert trans_vec to JSON and print it
+                nlohmann::json trans_vec_json = trans_vec;
+                auto output_version = _output_version.to_string().substr(
+                        1, _output_version.to_string().size() - 2);
+                RETURN_IF_ERROR(write_json_to_file(
+                        trans_vec_json,
+                        fmt::format("trans_vec_{}_{}", _tablet->tablet_id(), 
output_version)));
+
+                nlohmann::json src_index_files_json = src_index_files;
+                RETURN_IF_ERROR(write_json_to_file(
+                        src_index_files_json,
+                        fmt::format("src_idx_dirs_{}_{}", 
_tablet->tablet_id(), output_version)));
+
+                nlohmann::json dest_index_files_json = dest_index_files;
+                RETURN_IF_ERROR(write_json_to_file(
+                        dest_index_files_json,
+                        fmt::format("dest_idx_dirs_{}_{}", 
_tablet->tablet_id(), output_version)));
+
+                nlohmann::json dest_segment_num_rows_json = 
dest_segment_num_rows;
+                RETURN_IF_ERROR(
+                        write_json_to_file(dest_segment_num_rows_json,
+                                           
fmt::format("dest_seg_num_rows_{}_{}",
+                                                       _tablet->tablet_id(), 
output_version)));
+            }
+
             // create index_writer to compaction indexes
             auto& fs = _output_rowset->rowset_meta()->fs();
             auto& tablet_path = _tablet->tablet_path();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to