This is an automated email from the ASF dual-hosted git repository.
jianliangqi pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 133149310e4 [fix](inverted index) inverted Index File Cache Queue
Optimization (#46518)
133149310e4 is described below
commit 133149310e4f8e85de1d078715190fdbfdad0524
Author: zzzxl <[email protected]>
AuthorDate: Tue Jan 7 19:14:51 2025 +0800
[fix](inverted index) inverted Index File Cache Queue Optimization (#46518)
https://github.com/apache/doris/pull/46024
---
.../segment_v2/inverted_index_compound_reader.cpp | 41 ++++++---
.../olap/rowset/segment_v2/inverted_index_desc.cpp | 6 ++
.../olap/rowset/segment_v2/inverted_index_desc.h | 4 +
.../segment_v2/inverted_index_file_reader.cpp | 2 +
.../segment_v2/inverted_index_file_writer.cpp | 15 ++-
.../segment_v2/inverted_index_fs_directory.cpp | 16 ----
.../rowset/segment_v2/inverted_index_reader.cpp | 4 +-
.../segment_v2/inverted_index_file_writer_test.cpp | 30 +++---
.../test_index_file_cache_fault_injection.out | 52 +++++++++++
.../test_index_file_cache_fault_injection.groovy | 101 +++++++++++++++++++++
10 files changed, 223 insertions(+), 48 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
index c30017cc8fe..7a993daacf1 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
@@ -57,17 +57,18 @@ namespace segment_v2 {
class CSIndexInput : public lucene::store::BufferedIndexInput {
private:
CL_NS(store)::IndexInput* base;
+ std::string file_name;
int64_t fileOffset;
int64_t _length;
const io::IOContext* _io_ctx = nullptr;
- bool _is_index_file = false; // Indicates if the file is a TII file
protected:
void readInternal(uint8_t* /*b*/, const int32_t /*len*/) override;
void seekInternal(const int64_t /*pos*/) override {}
public:
- CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t fileOffset,
const int64_t length,
+ CSIndexInput(CL_NS(store)::IndexInput* base, const std::string& file_name,
+ const int64_t fileOffset, const int64_t length,
const int32_t read_buffer_size =
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE);
CSIndexInput(const CSIndexInput& clone);
~CSIndexInput() override;
@@ -78,13 +79,14 @@ public:
const char* getObjectName() const override { return getClassName(); }
static const char* getClassName() { return "CSIndexInput"; }
void setIoContext(const void* io_ctx) override;
- void setIndexFile(bool isIndexFile) override;
};
-CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t
fileOffset,
- const int64_t length, const int32_t
read_buffer_size)
+CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const std::string&
file_name,
+ const int64_t fileOffset, const int64_t length,
+ const int32_t read_buffer_size)
: BufferedIndexInput(read_buffer_size) {
this->base = base;
+ this->file_name = file_name;
this->fileOffset = fileOffset;
this->_length = length;
}
@@ -101,7 +103,27 @@ void CSIndexInput::readInternal(uint8_t* b, const int32_t
len) {
base->setIoContext(_io_ctx);
}
- base->setIndexFile(_is_index_file);
+ DBUG_EXECUTE_IF("CSIndexInput.readInternal", {
+ for (const auto& entry : InvertedIndexDescriptor::index_file_info_map)
{
+ if (file_name.find(entry.first) != std::string::npos) {
+ if (!static_cast<const
io::IOContext*>(base->getIoContext())->is_index_data) {
+ _CLTHROWA(CL_ERR_IO,
+ "The 'is_index_data' flag should be true for
inverted index meta "
+ "files.");
+ }
+ }
+ }
+ for (const auto& entry :
InvertedIndexDescriptor::normal_file_info_map) {
+ if (file_name.find(entry.first) != std::string::npos) {
+ if (static_cast<const
io::IOContext*>(base->getIoContext())->is_index_data) {
+ _CLTHROWA(CL_ERR_IO,
+ "The 'is_index_data' flag should be false for
non-meta inverted "
+ "index files.");
+ }
+ }
+ }
+ });
+
base->seek(fileOffset + start);
bool read_from_buffer = true;
base->readBytes(b, len, read_from_buffer);
@@ -119,6 +141,7 @@ lucene::store::IndexInput* CSIndexInput::clone() const {
CSIndexInput::CSIndexInput(const CSIndexInput& clone) :
BufferedIndexInput(clone) {
this->base = clone.base;
+ this->file_name = clone.file_name;
this->fileOffset = clone.fileOffset;
this->_length = clone._length;
}
@@ -129,10 +152,6 @@ void CSIndexInput::setIoContext(const void* io_ctx) {
_io_ctx = static_cast<const io::IOContext*>(io_ctx);
}
-void CSIndexInput::setIndexFile(bool isIndexFile) {
- _is_index_file = isIndexFile;
-}
-
DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream,
int32_t read_buffer_size)
: _ram_dir(new lucene::store::RAMDirectory()),
_stream(stream),
@@ -312,7 +331,7 @@ bool DorisCompoundReader::openInput(const char* name,
lucene::store::IndexInput*
bufferSize = _read_buffer_size;
}
- ret = _CLNEW CSIndexInput(_stream, entry->offset, entry->length,
bufferSize);
+ ret = _CLNEW CSIndexInput(_stream, entry->file_name, entry->offset,
entry->length, bufferSize);
return true;
}
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_desc.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_desc.cpp
index 8eac73f13a6..e909bc1e0a9 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_desc.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_desc.cpp
@@ -24,6 +24,12 @@
namespace doris::segment_v2 {
+const std::unordered_map<std::string, int32_t>
InvertedIndexDescriptor::index_file_info_map = {
+ {"null_bitmap", 1}, {"segments.gen", 2}, {"segments_", 3}, {"fnm", 4},
{"tii", 5}};
+
+const std::unordered_map<std::string, int32_t>
InvertedIndexDescriptor::normal_file_info_map = {
+ {"tis", 1}, {"frq", 2}, {"prx", 3}};
+
// {tmp_dir}/{rowset_id}_{seg_id}_{index_id}@{suffix}
std::string InvertedIndexDescriptor::get_temporary_index_path(std::string_view
tmp_dir_path,
std::string_view
rowset_id,
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_desc.h
b/be/src/olap/rowset/segment_v2/inverted_index_desc.h
index 37f9cf3f4a1..f421c7f3790 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_desc.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_desc.h
@@ -20,6 +20,7 @@
#include <stdint.h>
#include <string>
+#include <unordered_map>
namespace doris {
struct RowsetId;
@@ -28,6 +29,9 @@ namespace segment_v2 {
class InvertedIndexDescriptor {
public:
+ static const std::unordered_map<std::string, int32_t> index_file_info_map;
+ static const std::unordered_map<std::string, int32_t> normal_file_info_map;
+
static constexpr std::string_view segment_suffix = ".dat";
static constexpr std::string_view index_suffix = ".idx";
static std::string get_temporary_index_path(std::string_view tmp_dir_path,
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
index 5306f2956c2..3629cfbbdfa 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
@@ -41,6 +41,7 @@ Status InvertedIndexFileReader::init(int32_t
read_buffer_size, const io::IOConte
if (_storage_format == InvertedIndexStorageFormatPB::V2) {
if (_stream) {
_stream->setIoContext(io_ctx);
+ _stream->setIndexFile(true);
}
}
}
@@ -83,6 +84,7 @@ Status InvertedIndexFileReader::_init_from(int32_t
read_buffer_size, const io::I
}
_stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input);
_stream->setIoContext(io_ctx);
+ _stream->setIndexFile(true);
// 3. read file
int32_t version = _stream->readInt(); // Read version number
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
index bb373be5ee9..73c3f1b65d4 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
@@ -171,17 +171,14 @@ Status InvertedIndexFileWriter::close() {
void InvertedIndexFileWriter::sort_files(std::vector<FileInfo>& file_infos) {
auto file_priority = [](const std::string& filename) {
- if (filename.find("segments") != std::string::npos) {
- return 1;
- }
- if (filename.find("fnm") != std::string::npos) {
- return 2;
- }
- if (filename.find("tii") != std::string::npos) {
- return 3;
+ for (const auto& entry : InvertedIndexDescriptor::index_file_info_map)
{
+ if (filename.find(entry.first) != std::string::npos) {
+ return entry.second;
+ }
}
- return 4; // Other files
+ return 6; // Other files
};
+
std::sort(file_infos.begin(), file_infos.end(), [&](const FileInfo& a,
const FileInfo& b) {
int32_t priority_a = file_priority(a.filename);
int32_t priority_b = file_priority(b.filename);
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
index 4befeba8991..e06ce69b7b2 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
@@ -201,22 +201,6 @@ lucene::store::IndexInput*
DorisFSDirectory::FSIndexInput::clone() const {
}
void DorisFSDirectory::FSIndexInput::close() {
BufferedIndexInput::close();
- /*if (_handle != nullptr) {
- std::mutex* lock = _handle->_shared_lock;
- bool ref = false;
- {
- std::lock_guard<std::mutex> wlock(*lock);
- //determine if we are about to delete the handle...
- ref = (_LUCENE_ATOMIC_INT_GET(_handle->__cl_refcount) > 1);
- //decdelete (deletes if refcount is down to 0
- _CLDECDELETE(_handle);
- }
-
- //if _handle is not ref by other FSIndexInput, try to release mutex
lock, or it will be leaked.
- if (!ref) {
- delete lock;
- }
- }*/
}
void DorisFSDirectory::FSIndexInput::setIoContext(const void* io_ctx) {
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
index 1c7f83b29a6..a75a9462f91 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
@@ -213,7 +213,9 @@ Status
InvertedIndexReader::create_index_searcher(lucene::store::Directory* dir,
*searcher = searcher_result;
// When the meta information has been read, the ioContext needs to be
reset to prevent it from being used by other queries.
-
static_cast<DorisCompoundReader*>(dir)->getDorisIndexInput()->setIoContext(nullptr);
+ auto stream = static_cast<DorisCompoundReader*>(dir)->getDorisIndexInput();
+ stream->setIoContext(nullptr);
+ stream->setIndexFile(false);
// NOTE: before mem_tracker hook becomes active, we caculate reader memory
size by hand.
mem_tracker->consume(index_searcher_builder->get_reader_size());
diff --git a/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp
b/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp
index dd3b4195c14..2320108af2c 100644
--- a/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp
+++ b/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp
@@ -335,7 +335,8 @@ TEST_F(InvertedIndexFileWriterTest, PrepareSortedFilesTest)
{
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(local_fs_index_path).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_fs_index_path).ok());
mock_dir->init(_fs, local_fs_index_path.c_str());
- std::vector<std::string> files = {"0.segments", "0.fnm", "0.tii",
"nullbitmap", "write.lock"};
+ std::vector<std::string> files = {"segments_0", "segments.gen", "0.fnm",
+ "0.tii", "null_bitmap",
"write.lock"};
for (auto& file : files) {
auto out_file_1 =
std::unique_ptr<lucene::store::IndexOutput>(mock_dir->createOutput(file.c_str()));
@@ -343,11 +344,14 @@ TEST_F(InvertedIndexFileWriterTest,
PrepareSortedFilesTest) {
out_file_1->close();
}
- EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("0.segments")))
+ EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("segments_0")))
.WillOnce(testing::Return(1000));
+ EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("segments.gen")))
+ .WillOnce(testing::Return(1200));
EXPECT_CALL(*mock_dir,
fileLength(testing::StrEq("0.fnm"))).WillOnce(testing::Return(2000));
EXPECT_CALL(*mock_dir,
fileLength(testing::StrEq("0.tii"))).WillOnce(testing::Return(1500));
- EXPECT_CALL(*mock_dir,
fileLength(testing::StrEq("nullbitmap"))).WillOnce(testing::Return(500));
+ EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("null_bitmap")))
+ .WillOnce(testing::Return(500));
InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id,
_seg_id,
InvertedIndexStorageFormatPB::V2);
@@ -362,24 +366,28 @@ TEST_F(InvertedIndexFileWriterTest,
PrepareSortedFilesTest) {
std::vector<FileInfo> sorted_files =
writer.prepare_sorted_files(writer._indices_dirs[std::make_pair(1,
"suffix1")].get());
- // 1. 0.segments (priority 1, size 1000)
- // 2. 0.fnm (priority 2, size 2000)
- // 3. 0.tii (priority 3, size 1500)
- // 4. nullbitmap (priority 4, size 500)
+ // 1. null_bitmap (priority 1, size 500)
+ // 2. segments.gen (priority 2, size 1200)
+ // 3. segments_0 (priority 3, size 1000)
+ // 4. 0.fnm (priority 4, size 2000)
+ // 5. 0.tii (priority 5, size 1500)
- std::vector<std::string> expected_order = {"0.segments", "0.fnm", "0.tii",
"nullbitmap"};
+ std::vector<std::string> expected_order = {"null_bitmap", "segments.gen",
"segments_0", "0.fnm",
+ "0.tii"};
ASSERT_EQ(sorted_files.size(), expected_order.size());
for (size_t i = 0; i < expected_order.size(); ++i) {
EXPECT_EQ(sorted_files[i].filename, expected_order[i]);
- if (sorted_files[i].filename == "0.segments") {
+ if (sorted_files[i].filename == "null_bitmap") {
+ EXPECT_EQ(sorted_files[i].filesize, 500);
+ } else if (sorted_files[i].filename == "segments.gen") {
+ EXPECT_EQ(sorted_files[i].filesize, 1200);
+ } else if (sorted_files[i].filename == "segments_0") {
EXPECT_EQ(sorted_files[i].filesize, 1000);
} else if (sorted_files[i].filename == "0.fnm") {
EXPECT_EQ(sorted_files[i].filesize, 2000);
} else if (sorted_files[i].filename == "0.tii") {
EXPECT_EQ(sorted_files[i].filesize, 1500);
- } else if (sorted_files[i].filename == "nullbitmap") {
- EXPECT_EQ(sorted_files[i].filesize, 500);
}
}
}
diff --git
a/regression-test/data/fault_injection_p0/test_index_file_cache_fault_injection.out
b/regression-test/data/fault_injection_p0/test_index_file_cache_fault_injection.out
new file mode 100644
index 00000000000..b096fdedd12
--- /dev/null
+++
b/regression-test/data/fault_injection_p0/test_index_file_cache_fault_injection.out
@@ -0,0 +1,52 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+1790
+
+-- !sql --
+2000
+
+-- !sql --
+4
+
+-- !sql --
+58
+
+-- !sql --
+0
+
+-- !sql --
+16
+
+-- !sql --
+12
+
+-- !sql --
+16
+
+-- !sql --
+12
+
+-- !sql --
+10
+
+-- !sql --
+88
+
+-- !sql --
+648
+
+-- !sql --
+386
+
+-- !sql --
+78
+
+-- !sql --
+746
+
+-- !sql --
+476
+
+-- !sql --
+2000
+
diff --git
a/regression-test/suites/fault_injection_p0/test_index_file_cache_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_index_file_cache_fault_injection.groovy
new file mode 100644
index 00000000000..8a04c15b839
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_index_file_cache_fault_injection.groovy
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_index_file_cache_fault_injection", "nonConcurrent") {
+ def indexTbName = "test_index_file_cache_fault_injection"
+
+ sql "DROP TABLE IF EXISTS ${indexTbName}"
+ sql """
+ CREATE TABLE ${indexTbName} (
+ `@timestamp` int(11) NULL COMMENT "",
+ `clientip` varchar(20) NULL COMMENT "",
+ `request` text NULL COMMENT "",
+ `status` int(11) NULL COMMENT "",
+ `size` int(11) NULL COMMENT "",
+ INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" =
"english", "support_phrase" = "true") COMMENT '',
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`@timestamp`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY RANDOM BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true"
+ );
+ """
+
+ def load_httplogs_data = {table_name, label, read_flag, format_flag,
file_name, ignore_failure=false,
+ expected_succ_rows = -1, load_to_single_tablet =
'true' ->
+
+ // load the json data
+ streamLoad {
+ table "${table_name}"
+
+ // set http request header params
+ set 'label', label + "_" + UUID.randomUUID().toString()
+ set 'read_json_by_line', read_flag
+ set 'format', format_flag
+ file file_name // import json file
+ time 10000 // limit inflight 10s
+ if (expected_succ_rows >= 0) {
+ set 'max_filter_ratio', '1'
+ }
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+ check { result, exception, startTime, endTime ->
+ if (ignore_failure && expected_succ_rows < 0) { return }
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ }
+ }
+ }
+
+ try {
+ load_httplogs_data.call(indexTbName,
'test_index_file_cache_fault_injection', 'true', 'json', 'documents-1000.json')
+ load_httplogs_data.call(indexTbName,
'test_index_file_cache_fault_injection', 'true', 'json', 'documents-1000.json')
+
+ sql "sync"
+ sql """ set enable_common_expr_pushdown = true; """
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("CSIndexInput.readInternal")
+ qt_sql """ select count() from ${indexTbName} where request
match_phrase_prefix '0'; """
+ qt_sql """ select count() from ${indexTbName} where request
match_phrase_prefix '1'; """
+ qt_sql """ select count() from ${indexTbName} where request
match_phrase_prefix '2'; """
+ qt_sql """ select count() from ${indexTbName} where request
match_phrase_prefix '3'; """
+ qt_sql """ select count() from ${indexTbName} where request
match_phrase_prefix '4'; """
+ qt_sql """ select count() from ${indexTbName} where request
match_phrase_prefix '5'; """
+ qt_sql """ select count() from ${indexTbName} where request
match_phrase_prefix '6'; """
+ qt_sql """ select count() from ${indexTbName} where request
match_phrase_prefix '7'; """
+ qt_sql """ select count() from ${indexTbName} where request
match_phrase_prefix '8'; """
+ qt_sql """ select count() from ${indexTbName} where request
match_phrase_prefix '9'; """
+ qt_sql """ select count() from ${indexTbName} where request
match_phrase_prefix 'a'; """
+ qt_sql """ select count() from ${indexTbName} where request
match_phrase_prefix 'b'; """
+ qt_sql """ select count() from ${indexTbName} where request
match_phrase_prefix 'c'; """
+ qt_sql """ select count() from ${indexTbName} where request
match_phrase_prefix 'd'; """
+ qt_sql """ select count() from ${indexTbName} where request
match_phrase_prefix 'e'; """
+ qt_sql """ select count() from ${indexTbName} where request
match_phrase_prefix 'f'; """
+ qt_sql """ select count() from ${indexTbName} where request
match_phrase_prefix 'g'; """
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("CSIndexInput.readInternal")
+ }
+ } finally {
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]