This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 03b7afda992 [fix](inverted index) Split bound multi-segment readers 
(#63138)
03b7afda992 is described below

commit 03b7afda9927f552eebb99d478f2f76f49942099
Author: Jack <[email protected]>
AuthorDate: Mon May 25 17:01:21 2026 +0800

    [fix](inverted index) Split bound multi-segment readers (#63138)
    
    ### What problem does this PR solve?
    
    Issue Number: DORIS-25499
    
    Related PR: None
    
    Problem Summary:
    query_v2 search collection can drive segment iteration from
    `readers.front()` while leaf scorers resolve CLucene readers from
    `reader_bindings` or `field_reader_bindings`. If the actual leaf reader
    remains a multi-segment reader, `SegmentPostings` calls
    `TermDocs::readBlock()`, which CLucene does not support for
    multi-segment readers and can abort BE.
    
    This PR selects the actual segmented reader from the execution context,
    rewrites all readers and reader bindings to segment-level readers for
    each callback, validates segmented reader topology, and keeps
    `MultiSegmentReader` / `MultiReader` doc base handling explicit. It also
    adds regression coverage for `MultiReader`, segmented field bindings,
    and explicit single-reader binding keys.
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test
        - [x] Regression test
            - Added `MultiSegmentCollectorTest.CollectDocSetWithMultiReader`
    - Added
    `MultiSegmentCollectorTest.CollectDocSetWithSegmentedFieldBinding`
    - Added `MultiSegmentCollectorTest.CollectDocSetWithSingleReaderBinding`
        - [x] Unit Test
    - `env CCACHE_DIR=/tmp/doris25499-master-ccache
    CCACHE_TEMPDIR=/tmp/doris25499-master-ccache-tmp ./run-be-ut.sh --run
    --filter=MultiSegmentCollectorTest.* -j 32`
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason
    
    - Behavior changed:
        - [x] No.
        - [ ] Yes.
    
    - Does this need documentation?
        - [x] No.
        - [ ] Yes.
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label
---
 .../inverted/query_v2/collect/multi_segment_util.h | 153 ++++++++++++---
 .../query_v2/multi_segment_collector_test.cpp      | 216 +++++++++++++++++++++
 2 files changed, 347 insertions(+), 22 deletions(-)

diff --git 
a/be/src/storage/index/inverted/query_v2/collect/multi_segment_util.h 
b/be/src/storage/index/inverted/query_v2/collect/multi_segment_util.h
index 0519fe172c9..4e67b5aaacc 100644
--- a/be/src/storage/index/inverted/query_v2/collect/multi_segment_util.h
+++ b/be/src/storage/index/inverted/query_v2/collect/multi_segment_util.h
@@ -31,6 +31,7 @@
 #pragma GCC diagnostic ignored "-Woverloaded-virtual"
 #endif
 #include "CLucene.h"
+#include "CLucene/index/MultiReader.h"
 #include "CLucene/index/_MultiSegmentReader.h"
 #ifdef __clang__
 #pragma clang diagnostic pop
@@ -40,18 +41,133 @@
 
 namespace doris::segment_v2::inverted_index::query_v2 {
 
-inline QueryExecutionContext 
create_segment_context(lucene::index::IndexReader* seg_reader,
-                                                    const 
QueryExecutionContext& original_ctx,
+inline std::shared_ptr<lucene::index::IndexReader> non_owning_reader(
+        lucene::index::IndexReader* reader) {
+    return {reader, [](lucene::index::IndexReader*) {}};
+}
+
+inline const lucene::util::ArrayBase<lucene::index::IndexReader*>* sub_readers(
+        lucene::index::IndexReader* reader) {
+    if (auto* multi_segment_reader = 
dynamic_cast<lucene::index::MultiSegmentReader*>(reader)) {
+        return multi_segment_reader->getSubReaders();
+    }
+    if (auto* multi_reader = 
dynamic_cast<lucene::index::MultiReader*>(reader)) {
+        return multi_reader->getSubReaders();
+    }
+    return nullptr;
+}
+
+inline const int32_t* segment_starts(lucene::index::IndexReader* reader) {
+    if (auto* multi_segment_reader = 
dynamic_cast<lucene::index::MultiSegmentReader*>(reader)) {
+        return multi_segment_reader->getStarts();
+    }
+    return nullptr;
+}
+
+inline uint32_t segment_base(lucene::index::IndexReader* reader, size_t 
segment_index) {
+    const auto* starts = segment_starts(reader);
+    if (starts != nullptr) {
+        return static_cast<uint32_t>(starts[segment_index]);
+    }
+
+    const auto* segments = sub_readers(reader);
+    DCHECK(segments != nullptr);
+    uint32_t base = 0;
+    for (size_t i = 0; i < segment_index; ++i) {
+        base += (*segments)[i]->maxDoc();
+    }
+    return base;
+}
+
+inline std::shared_ptr<lucene::index::IndexReader> find_segmented_reader(
+        const QueryExecutionContext& context, const std::string& binding_key) {
+    if (!binding_key.empty()) {
+        if (auto it = context.reader_bindings.find(binding_key);
+            it != context.reader_bindings.end()) {
+            return sub_readers(it->second.get()) != nullptr ? it->second : 
nullptr;
+        }
+    }
+
+    for (const auto& reader : context.readers) {
+        if (sub_readers(reader.get()) != nullptr) {
+            return reader;
+        }
+    }
+    for (const auto& [_, reader] : context.reader_bindings) {
+        if (sub_readers(reader.get()) != nullptr) {
+            return reader;
+        }
+    }
+    for (const auto& [_, reader] : context.field_reader_bindings) {
+        if (sub_readers(reader.get()) != nullptr) {
+            return reader;
+        }
+    }
+    return nullptr;
+}
+
+inline void validate_segment_topology(
+        const std::shared_ptr<lucene::index::IndexReader>& reader,
+        const std::shared_ptr<lucene::index::IndexReader>& driver_reader) {
+    const auto* segments = sub_readers(reader.get());
+    if (segments == nullptr) {
+        return;
+    }
+
+    const auto* driver_segments = sub_readers(driver_reader.get());
+    DCHECK(driver_segments != nullptr);
+    DCHECK_EQ(segments->length, driver_segments->length);
+    for (size_t i = 0; i < driver_segments->length; ++i) {
+        DCHECK_EQ(segment_base(reader.get(), i), 
segment_base(driver_reader.get(), i));
+        DCHECK_EQ((*segments)[i]->maxDoc(), (*driver_segments)[i]->maxDoc());
+    }
+}
+
+inline void validate_segment_topologies(
+        const QueryExecutionContext& context,
+        const std::shared_ptr<lucene::index::IndexReader>& driver_reader) {
+    for (const auto& reader : context.readers) {
+        validate_segment_topology(reader, driver_reader);
+    }
+    for (const auto& [_, reader] : context.reader_bindings) {
+        validate_segment_topology(reader, driver_reader);
+    }
+    for (const auto& [_, reader] : context.field_reader_bindings) {
+        validate_segment_topology(reader, driver_reader);
+    }
+}
+
+inline std::shared_ptr<lucene::index::IndexReader> reader_for_segment(
+        const std::shared_ptr<lucene::index::IndexReader>& reader, size_t 
segment_index) {
+    const auto* segments = sub_readers(reader.get());
+    if (segments != nullptr) {
+        DCHECK_LT(segment_index, segments->length);
+        return non_owning_reader((*segments)[segment_index]);
+    }
+    return reader;
+}
+
+inline QueryExecutionContext create_segment_context(const 
QueryExecutionContext& original_ctx,
+                                                    size_t segment_index, 
uint32_t segment_num_rows,
                                                     const std::string& 
binding_key) {
     QueryExecutionContext seg_ctx;
-    seg_ctx.segment_num_rows = seg_reader->numDocs();
 
-    auto reader_ptr = std::shared_ptr<lucene::index::IndexReader>(
-            seg_reader, [](lucene::index::IndexReader*) {});
-    seg_ctx.readers.push_back(reader_ptr);
+    for (const auto& reader : original_ctx.readers) {
+        seg_ctx.readers.push_back(reader_for_segment(reader, segment_index));
+    }
+
+    seg_ctx.segment_num_rows = segment_num_rows;
 
-    if (!binding_key.empty()) {
-        seg_ctx.reader_bindings[binding_key] = reader_ptr;
+    for (const auto& [key, reader] : original_ctx.reader_bindings) {
+        seg_ctx.reader_bindings[key] = reader_for_segment(reader, 
segment_index);
+    }
+    for (const auto& [field, reader] : original_ctx.field_reader_bindings) {
+        seg_ctx.field_reader_bindings[field] = reader_for_segment(reader, 
segment_index);
+    }
+
+    if (!binding_key.empty() && !seg_ctx.readers.empty() &&
+        seg_ctx.reader_bindings.find(binding_key) == 
seg_ctx.reader_bindings.end()) {
+        seg_ctx.reader_bindings[binding_key] = seg_ctx.readers.front();
     }
 
     seg_ctx.binding_fields = original_ctx.binding_fields;
@@ -63,8 +179,8 @@ inline QueryExecutionContext 
create_segment_context(lucene::index::IndexReader*
 template <typename SegmentCallback>
 void for_each_index_segment(const QueryExecutionContext& context, const 
std::string& binding_key,
                             SegmentCallback&& callback) {
-    auto* reader = context.readers.empty() ? nullptr : 
context.readers.front().get();
-    if (!reader) {
+    auto segmented_reader = find_segmented_reader(context, binding_key);
+    if (!segmented_reader) {
         // No reader available (e.g., AllQuery/MatchAllDocsQuery which doesn't 
resolve fields).
         // Fall back to using the original context directly, as AllScorer only 
needs segment_num_rows.
         if (context.segment_num_rows > 0) {
@@ -73,23 +189,16 @@ void for_each_index_segment(const QueryExecutionContext& 
context, const std::str
         return;
     }
 
-    auto* multi_reader = 
dynamic_cast<lucene::index::MultiSegmentReader*>(reader);
-    if (multi_reader == nullptr) {
-        callback(context, 0);
-        return;
-    }
-
-    const auto* sub_readers = multi_reader->getSubReaders();
-    const auto* starts = multi_reader->getStarts();
-
+    const auto* sub_readers = query_v2::sub_readers(segmented_reader.get());
     if (!sub_readers || sub_readers->length == 0) {
         return;
     }
 
+    validate_segment_topologies(context, segmented_reader);
     for (size_t i = 0; i < sub_readers->length; ++i) {
-        auto* seg_reader = (*sub_readers)[i];
-        auto seg_base = static_cast<uint32_t>(starts[i]);
-        QueryExecutionContext seg_ctx = create_segment_context(seg_reader, 
context, binding_key);
+        auto seg_base = segment_base(segmented_reader.get(), i);
+        QueryExecutionContext seg_ctx =
+                create_segment_context(context, i, 
(*sub_readers)[i]->numDocs(), binding_key);
         callback(seg_ctx, seg_base);
     }
 }
diff --git 
a/be/test/storage/index/inverted/query_v2/multi_segment_collector_test.cpp 
b/be/test/storage/index/inverted/query_v2/multi_segment_collector_test.cpp
new file mode 100644
index 00000000000..4a04421c78a
--- /dev/null
+++ b/be/test/storage/index/inverted/query_v2/multi_segment_collector_test.cpp
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <CLucene.h>
+#include <CLucene/index/MultiReader.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <roaring/roaring.hh>
+#include <string>
+#include <vector>
+
+#include "io/fs/local_file_system.h"
+#include "storage/index/index_query_context.h"
+#include "storage/index/inverted/analyzer/custom_analyzer.h"
+#include "storage/index/inverted/query_v2/collect/doc_set_collector.h"
+#include "storage/index/inverted/query_v2/collect/multi_segment_util.h"
+#include "storage/index/inverted/query_v2/term_query/term_query.h"
+#include "storage/index/inverted/util/string_helper.h"
+
+CL_NS_USE(index)
+CL_NS_USE(store)
+CL_NS_USE(util)
+
+namespace doris::segment_v2 {
+
+using namespace inverted_index;
+using namespace inverted_index::query_v2;
+
+class MultiSegmentCollectorTest : public testing::Test {
+public:
+    const std::string kTestDir = "./ut_dir/multi_segment_collector_test";
+
+    void SetUp() override {
+        auto st = io::global_local_filesystem()->delete_directory(kTestDir);
+        ASSERT_TRUE(st.ok()) << st;
+        st = io::global_local_filesystem()->create_directory(kTestDir);
+        ASSERT_TRUE(st.ok()) << st;
+        st = io::global_local_filesystem()->create_directory(kTestDir + 
"/segment0");
+        ASSERT_TRUE(st.ok()) << st;
+        st = io::global_local_filesystem()->create_directory(kTestDir + 
"/segment1");
+        ASSERT_TRUE(st.ok()) << st;
+
+        create_test_index(kTestDir + "/segment0", {"fleabag premiere", "other 
title"});
+        create_test_index(kTestDir + "/segment1", {"history text", "fleabag 
finale"});
+    }
+
+    void TearDown() override {
+        
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(kTestDir).ok());
+    }
+
+private:
+    static void create_test_index(const std::string& dir, const 
std::vector<std::string>& docs,
+                                  int32_t max_buffered_docs = 100) {
+        CustomAnalyzerConfig::Builder builder;
+        builder.with_tokenizer_config("standard", {});
+        auto custom_analyzer_config = builder.build();
+        auto custom_analyzer = 
CustomAnalyzer::build_custom_analyzer(custom_analyzer_config);
+
+        auto* index_writer =
+                _CLNEW lucene::index::IndexWriter(dir.c_str(), 
custom_analyzer.get(), true);
+        index_writer->setMaxBufferedDocs(max_buffered_docs);
+        index_writer->setRAMBufferSizeMB(-1);
+        index_writer->setMaxFieldLength(0x7FFFFFFFL);
+        index_writer->setMergeFactor(1000000000);
+        index_writer->setUseCompoundFile(false);
+
+        auto char_string_reader = 
std::make_shared<lucene::util::SStringReader<char>>();
+        auto* doc = _CLNEW lucene::document::Document();
+        int32_t field_config = lucene::document::Field::STORE_NO;
+        field_config |= lucene::document::Field::INDEX_NONORMS;
+        field_config |= lucene::document::Field::INDEX_TOKENIZED;
+        auto field_name_w = StringHelper::to_wstring("title");
+        auto* field = _CLNEW lucene::document::Field(field_name_w.c_str(), 
field_config);
+        field->setOmitTermFreqAndPositions(false);
+        doc->add(*field);
+
+        for (const auto& data : docs) {
+            char_string_reader->init(data.data(), data.size(), false);
+            auto* stream = custom_analyzer->reusableTokenStream(field->name(), 
char_string_reader);
+            field->setValue(stream);
+            index_writer->addDocument(doc);
+        }
+
+        index_writer->close();
+        _CLLDELETE(index_writer);
+        _CLLDELETE(doc);
+    }
+};
+
+static std::shared_ptr<lucene::index::IndexReader> make_shared_reader(
+        lucene::index::IndexReader* raw_reader) {
+    return {raw_reader, [](lucene::index::IndexReader* reader) {
+                if (reader != nullptr) {
+                    reader->close();
+                    _CLDELETE(reader);
+                }
+            }};
+}
+
+TEST_F(MultiSegmentCollectorTest, CollectDocSetWithMultiReader) {
+    auto* dir0 = FSDirectory::getDirectory((kTestDir + "/segment0").c_str());
+    auto* dir1 = FSDirectory::getDirectory((kTestDir + "/segment1").c_str());
+
+    ValueArray<lucene::index::IndexReader*> readers(2);
+    readers[0] = lucene::index::IndexReader::open(dir0, true);
+    readers[1] = lucene::index::IndexReader::open(dir1, true);
+    auto reader = make_shared_reader(_CLNEW 
lucene::index::MultiReader(&readers, true));
+
+    auto index_query_context = std::make_shared<IndexQueryContext>();
+    auto field = StringHelper::to_wstring("title");
+    TermQuery query(index_query_context, field, 
StringHelper::to_wstring("fleabag"));
+    auto weight = query.weight(false);
+
+    QueryExecutionContext exec_ctx;
+    exec_ctx.segment_num_rows = reader->maxDoc();
+    exec_ctx.readers = {reader};
+    exec_ctx.field_reader_bindings.emplace(field, reader);
+
+    auto roaring = std::make_shared<roaring::Roaring>();
+    ASSERT_NO_THROW(collect_multi_segment_doc_set(weight, exec_ctx, "", 
roaring, nullptr, false));
+
+    EXPECT_EQ(roaring->cardinality(), 2);
+    EXPECT_TRUE(roaring->contains(0));
+    EXPECT_TRUE(roaring->contains(3));
+
+    _CLDECDELETE(dir0);
+    _CLDECDELETE(dir1);
+}
+
+TEST_F(MultiSegmentCollectorTest, CollectDocSetWithSegmentedFieldBinding) {
+    auto* dir0 = FSDirectory::getDirectory((kTestDir + "/segment0").c_str());
+
+    auto leading_reader = 
make_shared_reader(lucene::index::IndexReader::open(dir0, true));
+
+    const auto multi_segment_dir = kTestDir + "/multi_segment";
+    
ASSERT_TRUE(io::global_local_filesystem()->create_directory(multi_segment_dir).ok());
+    create_test_index(multi_segment_dir,
+                      {"fleabag premiere", "other title", "history text", 
"fleabag finale"}, 2);
+
+    auto* multi_segment_directory = 
FSDirectory::getDirectory(multi_segment_dir.c_str());
+    auto field_reader =
+            
make_shared_reader(lucene::index::IndexReader::open(multi_segment_directory, 
true));
+    const auto* field_segments = sub_readers(field_reader.get());
+    ASSERT_NE(field_segments, nullptr);
+    ASSERT_GT(field_segments->length, 1);
+
+    auto index_query_context = std::make_shared<IndexQueryContext>();
+    auto field = StringHelper::to_wstring("title");
+    TermQuery query(index_query_context, field, 
StringHelper::to_wstring("fleabag"));
+    auto weight = query.weight(false);
+
+    QueryExecutionContext exec_ctx;
+    exec_ctx.segment_num_rows = field_reader->maxDoc();
+    exec_ctx.readers = {leading_reader};
+    exec_ctx.field_reader_bindings.emplace(field, field_reader);
+
+    auto roaring = std::make_shared<roaring::Roaring>();
+    ASSERT_NO_THROW(collect_multi_segment_doc_set(weight, exec_ctx, "", 
roaring, nullptr, false));
+
+    EXPECT_EQ(roaring->cardinality(), 2);
+    EXPECT_TRUE(roaring->contains(0));
+    EXPECT_TRUE(roaring->contains(3));
+
+    _CLDECDELETE(dir0);
+    _CLDECDELETE(multi_segment_directory);
+}
+
+TEST_F(MultiSegmentCollectorTest, CollectDocSetWithSingleReaderBinding) {
+    auto* dir0 = FSDirectory::getDirectory((kTestDir + "/segment0").c_str());
+    auto* dir1 = FSDirectory::getDirectory((kTestDir + "/segment1").c_str());
+
+    auto bound_reader = 
make_shared_reader(lucene::index::IndexReader::open(dir0, true));
+
+    ValueArray<lucene::index::IndexReader*> readers(2);
+    readers[0] = lucene::index::IndexReader::open(dir0, true);
+    readers[1] = lucene::index::IndexReader::open(dir1, true);
+    auto field_reader = make_shared_reader(_CLNEW 
lucene::index::MultiReader(&readers, true));
+
+    auto index_query_context = std::make_shared<IndexQueryContext>();
+    auto field = StringHelper::to_wstring("title");
+    TermQuery query(index_query_context, field, 
StringHelper::to_wstring("fleabag"));
+    auto weight = query.weight(false);
+
+    QueryExecutionContext exec_ctx;
+    exec_ctx.segment_num_rows = bound_reader->maxDoc();
+    exec_ctx.readers = {bound_reader};
+    exec_ctx.reader_bindings.emplace("bound-title", bound_reader);
+    exec_ctx.field_reader_bindings.emplace(field, field_reader);
+
+    auto roaring = std::make_shared<roaring::Roaring>();
+    ASSERT_NO_THROW(collect_multi_segment_doc_set(weight, exec_ctx, 
"bound-title", roaring, nullptr,
+                                                  false));
+
+    EXPECT_EQ(roaring->cardinality(), 1);
+    EXPECT_TRUE(roaring->contains(0));
+
+    _CLDECDELETE(dir0);
+    _CLDECDELETE(dir1);
+}
+
+} // namespace doris::segment_v2


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to