This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 03b7afda992 [fix](inverted index) Split bound multi-segment readers
(#63138)
03b7afda992 is described below
commit 03b7afda9927f552eebb99d478f2f76f49942099
Author: Jack <[email protected]>
AuthorDate: Mon May 25 17:01:21 2026 +0800
[fix](inverted index) Split bound multi-segment readers (#63138)
### What problem does this PR solve?
Issue Number: DORIS-25499
Related PR: None
Problem Summary:
query_v2 search collection can drive segment iteration from
`readers.front()` while leaf scorers resolve CLucene readers from
`reader_bindings` or `field_reader_bindings`. If the actual leaf reader
remains a multi-segment reader, `SegmentPostings` calls
`TermDocs::readBlock()`, which CLucene does not support for
multi-segment readers and can abort BE.
This PR selects the actual segmented reader from the execution context,
rewrites all readers and reader bindings to segment-level readers for
each callback, validates segmented reader topology, and keeps
`MultiSegmentReader` / `MultiReader` doc base handling explicit. It also
adds regression coverage for `MultiReader`, segmented field bindings,
and explicit single-reader binding keys.
### Release note
None
### Check List (For Author)
- Test
- [x] Regression test
- Added `MultiSegmentCollectorTest.CollectDocSetWithMultiReader`
- Added
`MultiSegmentCollectorTest.CollectDocSetWithSegmentedFieldBinding`
- Added `MultiSegmentCollectorTest.CollectDocSetWithSingleReaderBinding`
- [x] Unit Test
- `env CCACHE_DIR=/tmp/doris25499-master-ccache
CCACHE_TEMPDIR=/tmp/doris25499-master-ccache-tmp ./run-be-ut.sh --run
--filter=MultiSegmentCollectorTest.* -j 32`
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason
- Behavior changed:
- [x] No.
- [ ] Yes.
- Does this need documentation?
- [x] No.
- [ ] Yes.
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label
---
.../inverted/query_v2/collect/multi_segment_util.h | 153 ++++++++++++---
.../query_v2/multi_segment_collector_test.cpp | 216 +++++++++++++++++++++
2 files changed, 347 insertions(+), 22 deletions(-)
diff --git
a/be/src/storage/index/inverted/query_v2/collect/multi_segment_util.h
b/be/src/storage/index/inverted/query_v2/collect/multi_segment_util.h
index 0519fe172c9..4e67b5aaacc 100644
--- a/be/src/storage/index/inverted/query_v2/collect/multi_segment_util.h
+++ b/be/src/storage/index/inverted/query_v2/collect/multi_segment_util.h
@@ -31,6 +31,7 @@
#pragma GCC diagnostic ignored "-Woverloaded-virtual"
#endif
#include "CLucene.h"
+#include "CLucene/index/MultiReader.h"
#include "CLucene/index/_MultiSegmentReader.h"
#ifdef __clang__
#pragma clang diagnostic pop
@@ -40,18 +41,133 @@
namespace doris::segment_v2::inverted_index::query_v2 {
-inline QueryExecutionContext
create_segment_context(lucene::index::IndexReader* seg_reader,
- const
QueryExecutionContext& original_ctx,
+inline std::shared_ptr<lucene::index::IndexReader> non_owning_reader(
+ lucene::index::IndexReader* reader) {
+ return {reader, [](lucene::index::IndexReader*) {}};
+}
+
+inline const lucene::util::ArrayBase<lucene::index::IndexReader*>* sub_readers(
+ lucene::index::IndexReader* reader) {
+ if (auto* multi_segment_reader =
dynamic_cast<lucene::index::MultiSegmentReader*>(reader)) {
+ return multi_segment_reader->getSubReaders();
+ }
+ if (auto* multi_reader =
dynamic_cast<lucene::index::MultiReader*>(reader)) {
+ return multi_reader->getSubReaders();
+ }
+ return nullptr;
+}
+
+inline const int32_t* segment_starts(lucene::index::IndexReader* reader) {
+ if (auto* multi_segment_reader =
dynamic_cast<lucene::index::MultiSegmentReader*>(reader)) {
+ return multi_segment_reader->getStarts();
+ }
+ return nullptr;
+}
+
+inline uint32_t segment_base(lucene::index::IndexReader* reader, size_t
segment_index) {
+ const auto* starts = segment_starts(reader);
+ if (starts != nullptr) {
+ return static_cast<uint32_t>(starts[segment_index]);
+ }
+
+ const auto* segments = sub_readers(reader);
+ DCHECK(segments != nullptr);
+ uint32_t base = 0;
+ for (size_t i = 0; i < segment_index; ++i) {
+ base += (*segments)[i]->maxDoc();
+ }
+ return base;
+}
+
+inline std::shared_ptr<lucene::index::IndexReader> find_segmented_reader(
+ const QueryExecutionContext& context, const std::string& binding_key) {
+ if (!binding_key.empty()) {
+ if (auto it = context.reader_bindings.find(binding_key);
+ it != context.reader_bindings.end()) {
+ return sub_readers(it->second.get()) != nullptr ? it->second :
nullptr;
+ }
+ }
+
+ for (const auto& reader : context.readers) {
+ if (sub_readers(reader.get()) != nullptr) {
+ return reader;
+ }
+ }
+ for (const auto& [_, reader] : context.reader_bindings) {
+ if (sub_readers(reader.get()) != nullptr) {
+ return reader;
+ }
+ }
+ for (const auto& [_, reader] : context.field_reader_bindings) {
+ if (sub_readers(reader.get()) != nullptr) {
+ return reader;
+ }
+ }
+ return nullptr;
+}
+
+inline void validate_segment_topology(
+ const std::shared_ptr<lucene::index::IndexReader>& reader,
+ const std::shared_ptr<lucene::index::IndexReader>& driver_reader) {
+ const auto* segments = sub_readers(reader.get());
+ if (segments == nullptr) {
+ return;
+ }
+
+ const auto* driver_segments = sub_readers(driver_reader.get());
+ DCHECK(driver_segments != nullptr);
+ DCHECK_EQ(segments->length, driver_segments->length);
+ for (size_t i = 0; i < driver_segments->length; ++i) {
+ DCHECK_EQ(segment_base(reader.get(), i),
segment_base(driver_reader.get(), i));
+ DCHECK_EQ((*segments)[i]->maxDoc(), (*driver_segments)[i]->maxDoc());
+ }
+}
+
+inline void validate_segment_topologies(
+ const QueryExecutionContext& context,
+ const std::shared_ptr<lucene::index::IndexReader>& driver_reader) {
+ for (const auto& reader : context.readers) {
+ validate_segment_topology(reader, driver_reader);
+ }
+ for (const auto& [_, reader] : context.reader_bindings) {
+ validate_segment_topology(reader, driver_reader);
+ }
+ for (const auto& [_, reader] : context.field_reader_bindings) {
+ validate_segment_topology(reader, driver_reader);
+ }
+}
+
+inline std::shared_ptr<lucene::index::IndexReader> reader_for_segment(
+ const std::shared_ptr<lucene::index::IndexReader>& reader, size_t
segment_index) {
+ const auto* segments = sub_readers(reader.get());
+ if (segments != nullptr) {
+ DCHECK_LT(segment_index, segments->length);
+ return non_owning_reader((*segments)[segment_index]);
+ }
+ return reader;
+}
+
+inline QueryExecutionContext create_segment_context(const
QueryExecutionContext& original_ctx,
+ size_t segment_index,
uint32_t segment_num_rows,
const std::string&
binding_key) {
QueryExecutionContext seg_ctx;
- seg_ctx.segment_num_rows = seg_reader->numDocs();
- auto reader_ptr = std::shared_ptr<lucene::index::IndexReader>(
- seg_reader, [](lucene::index::IndexReader*) {});
- seg_ctx.readers.push_back(reader_ptr);
+ for (const auto& reader : original_ctx.readers) {
+ seg_ctx.readers.push_back(reader_for_segment(reader, segment_index));
+ }
+
+ seg_ctx.segment_num_rows = segment_num_rows;
- if (!binding_key.empty()) {
- seg_ctx.reader_bindings[binding_key] = reader_ptr;
+ for (const auto& [key, reader] : original_ctx.reader_bindings) {
+ seg_ctx.reader_bindings[key] = reader_for_segment(reader,
segment_index);
+ }
+ for (const auto& [field, reader] : original_ctx.field_reader_bindings) {
+ seg_ctx.field_reader_bindings[field] = reader_for_segment(reader,
segment_index);
+ }
+
+ if (!binding_key.empty() && !seg_ctx.readers.empty() &&
+ seg_ctx.reader_bindings.find(binding_key) ==
seg_ctx.reader_bindings.end()) {
+ seg_ctx.reader_bindings[binding_key] = seg_ctx.readers.front();
}
seg_ctx.binding_fields = original_ctx.binding_fields;
@@ -63,8 +179,8 @@ inline QueryExecutionContext
create_segment_context(lucene::index::IndexReader*
template <typename SegmentCallback>
void for_each_index_segment(const QueryExecutionContext& context, const
std::string& binding_key,
SegmentCallback&& callback) {
- auto* reader = context.readers.empty() ? nullptr :
context.readers.front().get();
- if (!reader) {
+ auto segmented_reader = find_segmented_reader(context, binding_key);
+ if (!segmented_reader) {
// No reader available (e.g., AllQuery/MatchAllDocsQuery which doesn't
resolve fields).
// Fall back to using the original context directly, as AllScorer only
needs segment_num_rows.
if (context.segment_num_rows > 0) {
@@ -73,23 +189,16 @@ void for_each_index_segment(const QueryExecutionContext&
context, const std::str
return;
}
- auto* multi_reader =
dynamic_cast<lucene::index::MultiSegmentReader*>(reader);
- if (multi_reader == nullptr) {
- callback(context, 0);
- return;
- }
-
- const auto* sub_readers = multi_reader->getSubReaders();
- const auto* starts = multi_reader->getStarts();
-
+ const auto* sub_readers = query_v2::sub_readers(segmented_reader.get());
if (!sub_readers || sub_readers->length == 0) {
return;
}
+ validate_segment_topologies(context, segmented_reader);
for (size_t i = 0; i < sub_readers->length; ++i) {
- auto* seg_reader = (*sub_readers)[i];
- auto seg_base = static_cast<uint32_t>(starts[i]);
- QueryExecutionContext seg_ctx = create_segment_context(seg_reader,
context, binding_key);
+ auto seg_base = segment_base(segmented_reader.get(), i);
+ QueryExecutionContext seg_ctx =
+ create_segment_context(context, i,
(*sub_readers)[i]->numDocs(), binding_key);
callback(seg_ctx, seg_base);
}
}
diff --git
a/be/test/storage/index/inverted/query_v2/multi_segment_collector_test.cpp
b/be/test/storage/index/inverted/query_v2/multi_segment_collector_test.cpp
new file mode 100644
index 00000000000..4a04421c78a
--- /dev/null
+++ b/be/test/storage/index/inverted/query_v2/multi_segment_collector_test.cpp
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <CLucene.h>
+#include <CLucene/index/MultiReader.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <roaring/roaring.hh>
+#include <string>
+#include <vector>
+
+#include "io/fs/local_file_system.h"
+#include "storage/index/index_query_context.h"
+#include "storage/index/inverted/analyzer/custom_analyzer.h"
+#include "storage/index/inverted/query_v2/collect/doc_set_collector.h"
+#include "storage/index/inverted/query_v2/collect/multi_segment_util.h"
+#include "storage/index/inverted/query_v2/term_query/term_query.h"
+#include "storage/index/inverted/util/string_helper.h"
+
+CL_NS_USE(index)
+CL_NS_USE(store)
+CL_NS_USE(util)
+
+namespace doris::segment_v2 {
+
+using namespace inverted_index;
+using namespace inverted_index::query_v2;
+
+class MultiSegmentCollectorTest : public testing::Test {
+public:
+ const std::string kTestDir = "./ut_dir/multi_segment_collector_test";
+
+ void SetUp() override {
+ auto st = io::global_local_filesystem()->delete_directory(kTestDir);
+ ASSERT_TRUE(st.ok()) << st;
+ st = io::global_local_filesystem()->create_directory(kTestDir);
+ ASSERT_TRUE(st.ok()) << st;
+ st = io::global_local_filesystem()->create_directory(kTestDir +
"/segment0");
+ ASSERT_TRUE(st.ok()) << st;
+ st = io::global_local_filesystem()->create_directory(kTestDir +
"/segment1");
+ ASSERT_TRUE(st.ok()) << st;
+
+ create_test_index(kTestDir + "/segment0", {"fleabag premiere", "other
title"});
+ create_test_index(kTestDir + "/segment1", {"history text", "fleabag
finale"});
+ }
+
+ void TearDown() override {
+
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(kTestDir).ok());
+ }
+
+private:
+ static void create_test_index(const std::string& dir, const
std::vector<std::string>& docs,
+ int32_t max_buffered_docs = 100) {
+ CustomAnalyzerConfig::Builder builder;
+ builder.with_tokenizer_config("standard", {});
+ auto custom_analyzer_config = builder.build();
+ auto custom_analyzer =
CustomAnalyzer::build_custom_analyzer(custom_analyzer_config);
+
+ auto* index_writer =
+ _CLNEW lucene::index::IndexWriter(dir.c_str(),
custom_analyzer.get(), true);
+ index_writer->setMaxBufferedDocs(max_buffered_docs);
+ index_writer->setRAMBufferSizeMB(-1);
+ index_writer->setMaxFieldLength(0x7FFFFFFFL);
+ index_writer->setMergeFactor(1000000000);
+ index_writer->setUseCompoundFile(false);
+
+ auto char_string_reader =
std::make_shared<lucene::util::SStringReader<char>>();
+ auto* doc = _CLNEW lucene::document::Document();
+ int32_t field_config = lucene::document::Field::STORE_NO;
+ field_config |= lucene::document::Field::INDEX_NONORMS;
+ field_config |= lucene::document::Field::INDEX_TOKENIZED;
+ auto field_name_w = StringHelper::to_wstring("title");
+ auto* field = _CLNEW lucene::document::Field(field_name_w.c_str(),
field_config);
+ field->setOmitTermFreqAndPositions(false);
+ doc->add(*field);
+
+ for (const auto& data : docs) {
+ char_string_reader->init(data.data(), data.size(), false);
+ auto* stream = custom_analyzer->reusableTokenStream(field->name(),
char_string_reader);
+ field->setValue(stream);
+ index_writer->addDocument(doc);
+ }
+
+ index_writer->close();
+ _CLLDELETE(index_writer);
+ _CLLDELETE(doc);
+ }
+};
+
+static std::shared_ptr<lucene::index::IndexReader> make_shared_reader(
+ lucene::index::IndexReader* raw_reader) {
+ return {raw_reader, [](lucene::index::IndexReader* reader) {
+ if (reader != nullptr) {
+ reader->close();
+ _CLDELETE(reader);
+ }
+ }};
+}
+
+TEST_F(MultiSegmentCollectorTest, CollectDocSetWithMultiReader) {
+ auto* dir0 = FSDirectory::getDirectory((kTestDir + "/segment0").c_str());
+ auto* dir1 = FSDirectory::getDirectory((kTestDir + "/segment1").c_str());
+
+ ValueArray<lucene::index::IndexReader*> readers(2);
+ readers[0] = lucene::index::IndexReader::open(dir0, true);
+ readers[1] = lucene::index::IndexReader::open(dir1, true);
+ auto reader = make_shared_reader(_CLNEW
lucene::index::MultiReader(&readers, true));
+
+ auto index_query_context = std::make_shared<IndexQueryContext>();
+ auto field = StringHelper::to_wstring("title");
+ TermQuery query(index_query_context, field,
StringHelper::to_wstring("fleabag"));
+ auto weight = query.weight(false);
+
+ QueryExecutionContext exec_ctx;
+ exec_ctx.segment_num_rows = reader->maxDoc();
+ exec_ctx.readers = {reader};
+ exec_ctx.field_reader_bindings.emplace(field, reader);
+
+ auto roaring = std::make_shared<roaring::Roaring>();
+ ASSERT_NO_THROW(collect_multi_segment_doc_set(weight, exec_ctx, "",
roaring, nullptr, false));
+
+ EXPECT_EQ(roaring->cardinality(), 2);
+ EXPECT_TRUE(roaring->contains(0));
+ EXPECT_TRUE(roaring->contains(3));
+
+ _CLDECDELETE(dir0);
+ _CLDECDELETE(dir1);
+}
+
+TEST_F(MultiSegmentCollectorTest, CollectDocSetWithSegmentedFieldBinding) {
+ auto* dir0 = FSDirectory::getDirectory((kTestDir + "/segment0").c_str());
+
+ auto leading_reader =
make_shared_reader(lucene::index::IndexReader::open(dir0, true));
+
+ const auto multi_segment_dir = kTestDir + "/multi_segment";
+
ASSERT_TRUE(io::global_local_filesystem()->create_directory(multi_segment_dir).ok());
+ create_test_index(multi_segment_dir,
+ {"fleabag premiere", "other title", "history text",
"fleabag finale"}, 2);
+
+ auto* multi_segment_directory =
FSDirectory::getDirectory(multi_segment_dir.c_str());
+ auto field_reader =
+
make_shared_reader(lucene::index::IndexReader::open(multi_segment_directory,
true));
+ const auto* field_segments = sub_readers(field_reader.get());
+ ASSERT_NE(field_segments, nullptr);
+ ASSERT_GT(field_segments->length, 1);
+
+ auto index_query_context = std::make_shared<IndexQueryContext>();
+ auto field = StringHelper::to_wstring("title");
+ TermQuery query(index_query_context, field,
StringHelper::to_wstring("fleabag"));
+ auto weight = query.weight(false);
+
+ QueryExecutionContext exec_ctx;
+ exec_ctx.segment_num_rows = field_reader->maxDoc();
+ exec_ctx.readers = {leading_reader};
+ exec_ctx.field_reader_bindings.emplace(field, field_reader);
+
+ auto roaring = std::make_shared<roaring::Roaring>();
+ ASSERT_NO_THROW(collect_multi_segment_doc_set(weight, exec_ctx, "",
roaring, nullptr, false));
+
+ EXPECT_EQ(roaring->cardinality(), 2);
+ EXPECT_TRUE(roaring->contains(0));
+ EXPECT_TRUE(roaring->contains(3));
+
+ _CLDECDELETE(dir0);
+ _CLDECDELETE(multi_segment_directory);
+}
+
+TEST_F(MultiSegmentCollectorTest, CollectDocSetWithSingleReaderBinding) {
+ auto* dir0 = FSDirectory::getDirectory((kTestDir + "/segment0").c_str());
+ auto* dir1 = FSDirectory::getDirectory((kTestDir + "/segment1").c_str());
+
+ auto bound_reader =
make_shared_reader(lucene::index::IndexReader::open(dir0, true));
+
+ ValueArray<lucene::index::IndexReader*> readers(2);
+ readers[0] = lucene::index::IndexReader::open(dir0, true);
+ readers[1] = lucene::index::IndexReader::open(dir1, true);
+ auto field_reader = make_shared_reader(_CLNEW
lucene::index::MultiReader(&readers, true));
+
+ auto index_query_context = std::make_shared<IndexQueryContext>();
+ auto field = StringHelper::to_wstring("title");
+ TermQuery query(index_query_context, field,
StringHelper::to_wstring("fleabag"));
+ auto weight = query.weight(false);
+
+ QueryExecutionContext exec_ctx;
+ exec_ctx.segment_num_rows = bound_reader->maxDoc();
+ exec_ctx.readers = {bound_reader};
+ exec_ctx.reader_bindings.emplace("bound-title", bound_reader);
+ exec_ctx.field_reader_bindings.emplace(field, field_reader);
+
+ auto roaring = std::make_shared<roaring::Roaring>();
+ ASSERT_NO_THROW(collect_multi_segment_doc_set(weight, exec_ctx,
"bound-title", roaring, nullptr,
+ false));
+
+ EXPECT_EQ(roaring->cardinality(), 1);
+ EXPECT_TRUE(roaring->contains(0));
+
+ _CLDECDELETE(dir0);
+ _CLDECDELETE(dir1);
+}
+
+} // namespace doris::segment_v2
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]