github-actions[bot] commented on code in PR #63633: URL: https://github.com/apache/doris/pull/63633#discussion_r3325160091
########## be/src/storage/index/inverted/spimi/segment_merger.cpp: ########## @@ -0,0 +1,323 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "storage/index/inverted/spimi/segment_merger.h" + +#include <algorithm> +#include <memory> +#include <queue> +#include <utility> + +#include "common/logging.h" +#include "storage/index/inverted/spimi/field_infos_writer.h" +#include "storage/index/inverted/spimi/freq_prox_encoder.h" +#include "storage/index/inverted/spimi/posting_decoder.h" +#include "storage/index/inverted/spimi/segment_infos_writer.h" +#include "storage/index/inverted/spimi/term_dict_writer.h" +#include "storage/index/inverted/spimi/term_enum.h" + +namespace doris::segment_v2::inverted_index::spimi { + +namespace { + +// Single-input fast path: copies posting bytes (.tis/.tii/.frq/.prx) +// directly from the input to the output without decode/re-encode, +// then rebuilds metadata (.fnm, segments_N, segments.gen) with the +// caller's parameters. +// +// Safe when the single input's encoding matches the output format: +// - doc_offset is 0 (always true for the first/only input) +// - omit_term_freq_and_positions is false (spill always has positions) +// - omit_norms is true (spill always omits norms) +// This covers the V4 (pure SPIMI) path. +int64_t MergeSingleInput(const SegmentMerger::Input& input, const SpimiSegmentSink& sink, + const std::string& segment_name, const std::string& field_name, + int32_t total_doc_count, int32_t index_version, + bool omit_term_freq_and_positions, bool omit_norms) { + // The byte-copy is only valid when the single input's on-disk + // encoding matches these output flags (positions present, norms + // omitted). The dispatch guard in Merge() enforces this; the + // DCHECK makes the coupling crash-loud in debug for any future + // caller that bypasses the guard. + DCHECK(omit_norms && !omit_term_freq_and_positions); + + // Copy all posting bytes directly — no decode/re-encode cycle. + // The single input's doc_offset is 0, so TermInfo pointers in + // .tis remain valid and posting data needs no adjustment. + sink.tis->WriteBytes(input.tis_bytes.data(), input.tis_bytes.size()); + sink.tii->WriteBytes(input.tii_bytes.data(), input.tii_bytes.size()); + sink.frq->WriteBytes(input.frq_bytes.data(), input.frq_bytes.size()); + sink.prx->WriteBytes(input.prx_bytes.data(), input.prx_bytes.size()); + + // Rebuild .fnm with the caller's index_version and field flags. + // The spill's .fnm may have used kIndexVersionV1; the final + // output may need V0. Only .fnm needs rewriting because the + // index_version tag lives there, not in the posting bytes. + { + FieldInfoEntry fi; + fi.name = field_name; + fi.is_indexed = true; + fi.has_prox = !omit_term_freq_and_positions; + fi.omit_norms = omit_norms; + fi.index_version = index_version; + fi.flags = 0; + FieldInfosWriter(sink.fnm).Write({fi}); + } + + // Rebuild segments_N and segments.gen with the correct segment + // name and total doc count. + { + SegmentInfoEntry seg; + seg.name = segment_name; + seg.doc_count = total_doc_count; + seg.del_gen = -1; + seg.doc_store_offset = -1; + seg.has_single_norm_file = true; + seg.is_compound_file = -1; + SegmentInfosWriter manifest_writer; + manifest_writer.WriteSegmentsN(sink.segments_n, /*version=*/1, /*counter=*/1, {seg}); + manifest_writer.WriteSegmentsGen(sink.segments_gen, /*generation=*/1); + } + + // Return the term count from the input's .tis footer. + TermEnum tenum(input.tis_bytes); + return tenum.TotalEntries(); +} + +// One cursor walking a single input segment's TermEnum. +struct MergeCursor { + int32_t input_index = 0; + int32_t doc_offset = 0; // added to each decoded doc_id + TermEnum* tenum = nullptr; +}; + +// Heap entry: the current term from one input segment. +struct HeapEntry { + int32_t field_number; + std::string term_utf8; + int32_t input_index; + int32_t doc_offset; + + // Min-heap: smallest (field, term) wins; input_index breaks ties + // so inputs are processed in spill order. + bool operator>(const HeapEntry& o) const { + if (field_number != o.field_number) { + return field_number > o.field_number; + } + if (term_utf8 != o.term_utf8) { + return term_utf8 > o.term_utf8; + } + return input_index > o.input_index; + } +}; + +// Helper: compute the .prx byte range for one term. +// `prox_start` is the term's prox_pointer (absolute in .prx). +// `prox_end` is the next term's prox_pointer, or prx_bytes.size() +// for the last term. +std::pair<const uint8_t*, size_t> PrxRange(const std::vector<uint8_t>& prx_bytes, + int64_t prox_start, int64_t prox_end) { + if (prox_start < 0 || prox_start >= static_cast<int64_t>(prx_bytes.size())) { + return {nullptr, 0}; + } + if (prox_end <= prox_start) { + return {nullptr, 0}; + } + return {prx_bytes.data() + prox_start, static_cast<size_t>(prox_end - prox_start)}; +} + +} // namespace + +int64_t SegmentMerger::Merge(const std::vector<Input>& inputs, const SpimiSegmentSink& sink, + const std::string& segment_name, const std::string& field_name, + int32_t total_doc_count, int32_t index_version, + bool omit_term_freq_and_positions, bool omit_norms) { + if (inputs.empty()) { + return 0; + } + + // Single-input fast path: when there is exactly one input and + // the output format matches the spill format (has positions, + // omits norms — always true for V4), copy posting bytes + // directly and rebuild only metadata. This eliminates the + // decode/re-encode cycle for the common case where the buffer + // was flushed exactly once before finish. + if (inputs.size() == 1 && !omit_term_freq_and_positions && omit_norms) { + return MergeSingleInput(inputs[0], sink, segment_name, field_name, total_doc_count, + index_version, omit_term_freq_and_positions, omit_norms); + } + + // Compute per-input doc_id offsets. + std::vector<int32_t> doc_offsets(inputs.size(), 0); + { + int32_t running = 0; + for (size_t i = 0; i < inputs.size(); ++i) { + doc_offsets[i] = running; + running += inputs[i].doc_count; + } + } + + // Create TermEnums for each input. + std::vector<std::unique_ptr<TermEnum>> enums; + enums.reserve(inputs.size()); + for (size_t i = 0; i < inputs.size(); ++i) { + enums.push_back(std::make_unique<TermEnum>(inputs[i].tis_bytes)); + } + + // Seed the min-heap with the first term from each input. + using Heap = std::priority_queue<HeapEntry, std::vector<HeapEntry>, std::greater<>>; + Heap heap; + for (size_t i = 0; i < inputs.size(); ++i) { + if (enums[i]->Next()) { + const auto& e = enums[i]->Current(); + heap.push({e.field_number, e.term_utf8, static_cast<int32_t>(i), doc_offsets[i]}); + } + } + + // Open the output writers. + TermDictWriter dict(sink.tis, sink.tii, TermDictWriter::kDefaultIndexInterval, + TermDictWriter::kDefaultSkipInterval); + FreqProxEncoder encoder(sink.frq, sink.prx, TermDictWriter::kDefaultSkipInterval, + TermDictWriter::kMaxSkipLevels, omit_term_freq_and_positions); + + int64_t term_count = 0; + + while (!heap.empty()) { + // Pop the smallest (field, term). Collect all inputs that + // share this exact (field, term). + const auto top = heap.top(); + heap.pop(); + + const int32_t cur_field = top.field_number; + const std::string& cur_term = top.term_utf8; + + // Gather decoded docs from all inputs that have this term. + std::vector<DecodedDoc> merged_docs; + + // Process the first (smallest) input for this term. + auto process_input = [&](int32_t idx, int32_t offset) { + const auto& input = inputs[idx]; + const auto& entry = enums[idx]->Current(); + + // Compute the .frq byte range for this term. + // Use the next entry's freq_pointer or frq_bytes.size(). + const int64_t frq_start = entry.info.freq_pointer; + // We don't easily know the end without peeking ahead. + // Use the full remaining buffer — PostingDecoder stops + // after doc_freq docs anyway. + const size_t frq_len = + static_cast<size_t>(input.frq_bytes.size()) - static_cast<size_t>(frq_start); + + // Compute the .prx byte range. + const int64_t prx_start = entry.info.prox_pointer; + const int64_t prx_end = static_cast<int64_t>(input.prx_bytes.size()); + auto [prx_ptr, prx_len] = PrxRange(input.prx_bytes, prx_start, prx_end); + + auto docs = PostingDecoder::Decode(input.frq_bytes.data() + frq_start, frq_len, prx_ptr, + prx_len, entry.info.doc_freq, + !omit_term_freq_and_positions); + + // Apply doc_id offset and append. + for (auto& d : docs) { + d.doc_id += offset; + } Review Comment: This offset corrupts production V4 spills. `InvertedIndexColumnWriter::add_values()` appends the segment-level `_rid` into `SpimiPostingBuffer`, and `SpillManager::FlushBuffer()` emits those doc ids as-is, so spill inputs already contain absolute row ids. When a large segment crosses the SPIMI memory budget more than once, the second and later spills get `running` added again here, shifting postings beyond their real rows (often beyond `total_doc_count`) and causing MATCH queries to return wrong/missing rows. The current merger tests build artificial inputs with local doc ids, so they do not cover the actual writer->spill contract. Please either make spill buffers localize doc ids before emitting, or remove this offset for V4 spill inputs and add an end-to-end test with multiple spills using absolute `_rid` values. ########## be/src/storage/index/inverted/inverted_index_writer.cpp: ########## @@ -367,6 +460,135 @@ template <FieldType field_type> Status InvertedIndexColumnWriter<field_type>::add_values(const std::string fn, const void* values, size_t count) { if constexpr (field_is_slice_type(field_type)) { + if (_is_v4) { + // V4 = pure SPIMI. No CLucene Document/Field/IndexWriter + // touched. Tokens flow direct: analyzer.reusableTokenStream + // → next() loop → SpimiPostingBuffer::Append. This is the + // path that delivers the SPIMI memory-savings target. + // + // The analyzer / token-stream calls below can throw + // CLuceneError (custom tokenizers, filter exceptions); + // wrap the whole loop in try/catch so CLucene exceptions + // and DORIS_CHECK-thrown doris::Exception convert to + // Status rather than escaping `add_values` past the + // segment writer's caller. Matches the catch placement + // in `add_document()` for the V1/V2/V3 path. + try { + const auto* v = (Slice*)values; + for (size_t i = 0; i < count; ++i) { + if ((!_should_analyzer && v->get_size() > _ignore_above) || + (_should_analyzer && v->empty())) { + // Empty / over-limit value → no tokens to index, + // but the ROW IS NOT NULL. `_null_bitmap` must + // mirror only true upstream nulls (added by + // `add_nulls`); empty strings have a non-null + // column value of "" and `IS NULL` must return + // false for them. Mirrors V2's behavior at + // `add_values:615` which calls `add_null_document` + // (a CLucene "null-doc" marker that does NOT + // touch `_null_bitmap`). The earlier V4 impl + // added these to `_null_bitmap`, causing + // `WHERE body IS NULL` to incorrectly return + // empty-string rows. + } else if (_should_analyzer) { + // Tokenize. Mirror TeeTokenStream::next: position + // accumulator clamped to 0 on first token to + // handle the synonym-overlay-as-first-token edge + // case CLucene's DocumentsWriter normalises. + _char_string_reader->init(v->get_data(), cast_set<int32_t>(v->get_size()), + false); + auto* stream = _analyzer->reusableTokenStream(_field_name.c_str(), + _char_string_reader); + // `reusableTokenStream` returning null means the + // analyzer is mis-configured at init time — a + // programmer error, not a runtime input. Per + // CLAUDE.md's "assert correctness, no defensive + // if" rule, crash via DORIS_CHECK rather than + // silently dropping the row's tokens. + DORIS_CHECK(stream != nullptr); + stream->reset(); + lucene::analysis::Token tok; + int32_t pos = -1; + bool first_token = true; + while (stream->next(&tok) != nullptr) { + pos += tok.getPositionIncrement(); + if (first_token && pos < 0) { + pos = 0; + } + first_token = false; + const char* term_buf = tok.template termBuffer<char>(); + const size_t term_len = tok.template termLength<char>(); + // Skip zero-length tokens (legitimate output of + // some filters). `term_buf` is non-null by + // analyzer contract when term_len > 0 — no + // defensive guard. + if (term_len > 0) { + _spimi_writer->AppendToken(std::string_view(term_buf, term_len), + static_cast<uint32_t>(_rid), + static_cast<uint32_t>(pos)); + // Mid-row saturation check. The buffer's + // `Append` is silently no-op once + // saturated; without polling here the + // remaining tokens of THIS row would be + // silently dropped before the outer row- + // boundary poll catches it. Throw inside + // the try so the existing catch records + // context + calls close_on_error. + if (_spimi_writer->Saturated()) [[unlikely]] { + _CLTHROWA(CL_ERR_IO, + "V4 SPIMI buffer saturated mid-row: " + "subsequent tokens would be dropped"); + } + } + } + } else { + // Non-analyzed (keyword) string: append whole + // value at position 0 — same semantics CLucene's + // setValue(char*, len) produces. + _spimi_writer->AppendToken(std::string_view(v->get_data(), v->get_size()), + static_cast<uint32_t>(_rid), 0); + } + // Poll saturation after each row's worth of Appends. + // The buffer's `Append` is void / silent on + // saturation — under V4 we must surface the error + // immediately instead of letting subsequent rows + // silently drop their tokens before `finish()` + // ultimately fails. Shadow mode (V1/V2/V3) keeps the + // existing silent-drop behaviour: CLucene is the + // primary, the shadow buffer is best-effort. + if (_spimi_writer->Saturated()) [[unlikely]] { + return Status::Error<doris::ErrorCode::INVERTED_INDEX_FILE_CORRUPTED>( + "V4 SPIMI buffer saturated mid-batch for field {}: subsequent " + "tokens " + "would be dropped silently", + std::string(_field_name.begin(), _field_name.end())); + } + // Spill check: if the buffer exceeded the memory + // budget, flush it to an in-memory spill segment + // and continue accepting tokens. + if (_spimi_writer->ShouldFlush()) { + _spimi_writer->FlushPending(_spimi_doc_count); + } Review Comment: The doc count passed to the spill is stale for the row that triggered the flush. At this point the current row's tokens have already been appended with doc id `_rid`, but `_spimi_doc_count` is only advanced below this block. If this is the first row past the budget, the spill manifest can advertise a `doc_count` that excludes a doc whose postings are present. That stale count is then used by `SegmentMerger` for offsets/metadata. Update `_spimi_doc_count` before `FlushPending()` (or pass `static_cast<int32_t>(_rid) + 1`) so each spill's metadata covers all postings it contains. ########## fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java: ########## @@ -1275,11 +1277,15 @@ public static TInvertedIndexFileStorageFormat analyzeInvertedIndexFileStorageFor return TInvertedIndexFileStorageFormat.V2; } else if (invertedIndexFileStorageFormat.equalsIgnoreCase("v3")) { return TInvertedIndexFileStorageFormat.V3; + } else if (invertedIndexFileStorageFormat.equalsIgnoreCase("v4")) { + return TInvertedIndexFileStorageFormat.V4; } else if (invertedIndexFileStorageFormat.equalsIgnoreCase("default")) { Review Comment: Exposing explicit `V4` here needs matching FE validation for the supported index shapes. The BE writer now rejects V4 when `should_analyzer` is false and also rejects array string indexes, but `InvertedIndexUtil.checkInvertedIndexParser()` still accepts parser `none` and array-with-parser-none for the same table property. That lets `CREATE TABLE ... PROPERTIES("inverted_index_storage_format"="V4")` succeed for keyword/array inverted indexes, and the first load then fails in BE writer init. Please reject unsupported V4 parser/type combinations during analysis, or implement those paths end-to-end. ########## regression-test/suites/inverted_index_p0/storage_format/test_storage_format_v4_query_latency.groovy: ########## @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// V2 vs V4 query latency benchmark. +// +// UT-level benchmarking of the V4 read path (SpimiQueryIndexReader) +// is blocked: the V4 reader stack depends on global state that BE +// startup initializes but unit-test fixtures do not, and any in-process +// V4 query in a `InvertedIndexReaderTest`-style fixture segfaults. +// Production read paths work — the existing +// `test_storage_format_v4` regression already proves V2/V4 correctness +// parity via `order_qt_v2v4_*` comparisons. +// +// This suite adds the missing piece: **timing**. Same data, same query, +// 12 runs each through the real query planner + executor + reader +// stack against the running cluster. Reports min / p25 / median / p75 / +// max for both formats. The first 3 runs are discarded as warmup +// (page cache + tablet schema cache + searcher cache fill). V2/V4 +// alternation per iteration prevents one side from accumulating an +// unfair cache benefit. +// +// Caps: median V4 must not exceed 1.5x V2 median. V4's reader uses +// SpimiQueryIndexReader + PFOR decoder + per-segment metadata cache; +// any path that adds >50 % over CLucene's reader is a real regression. +suite("test_storage_format_v4_query_latency", "p0") { + def v2Table = "qlat_v2" + def v4Table = "qlat_v4" + sql "DROP TABLE IF EXISTS ${v2Table}" + sql "DROP TABLE IF EXISTS ${v4Table}" + + // Same schema as `test_storage_format_v4`'s parity tables. Single + // bucket so per-query work is concentrated on one segment file — + // makes the timing signal cleaner than multi-bucket distribution. + def schema = { fmt, tbl -> """ + CREATE TABLE ${tbl} ( + id int NULL, body string NULL, + INDEX body_idx (body) USING INVERTED + PROPERTIES("parser"="english", "lower_case"="true", + "support_phrase"="true") + ) ENGINE=OLAP DUPLICATE KEY(id) DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ("replication_allocation"="tag.location.default: 1", + "inverted_index_storage_format"="${fmt}", + "disable_auto_compaction"="true") + """ } + sql schema('V2', v2Table) + sql schema('V4', v4Table) + + // Generate 1000 rows: 600 with "common term", 200 with "apple + // fruit", 200 with "banana fruit". Query for "common" hits the + // largest posting list — the workload where reader differences + // surface fastest. + def fillRows = { tbl -> + def chunkSize = 100 + for (int chunk = 0; chunk < 10; chunk++) { + def values = [] + for (int i = 0; i < chunkSize; i++) { + def rowId = chunk * chunkSize + i + def text + if (rowId < 600) { text = "common term row${rowId}" } + else if (rowId < 800) { text = "apple fruit row${rowId}" } + else { text = "banana fruit row${rowId}" } + values.add("(${rowId}, '${text}')") + } + sql "INSERT INTO ${tbl} VALUES ${values.join(',')}" + } + } + fillRows(v2Table) + fillRows(v4Table) + + // Trigger segment compaction is disabled (DISABLE_AUTO_COMPACTION=true) + // so the inserts above produced multiple small segments per tablet. + // For a benchmark we want ONE large segment per format so per-query + // work is comparable. Force a compaction now. + // (Best-effort; if the cluster rejects it we proceed with multi- + // segment timing, which still produces a fair V2-vs-V4 ratio.) + try { + sql "SET enable_segcompaction = true" + } catch (Exception ignored) { /* older versions don't expose this */ } + + // Timed query helper. Calls the same SELECT statement N+kWarmup + // times, drops the first kWarmup as warmup, returns the timing + // distribution in microseconds. + def runQueryTiming = { tbl, predicate -> + def kIterations = 12 + def kWarmup = 3 + def samples = [] + for (int i = 0; i < kIterations; i++) { + def t0 = System.nanoTime() + sql "SELECT id FROM ${tbl} WHERE ${predicate} ORDER BY id" + def t1 = System.nanoTime() + samples.add((t1 - t0) / 1000.0) // ns -> us + } + // Drop warmup runs in arrival order, then sort for percentile. + samples = samples.drop(kWarmup).sort() + def n = samples.size() + def pct = { p -> + def idx = (n - 1) * p + def lo = (int) idx + def hi = Math.min(lo + 1, n - 1) + def frac = idx - lo + return samples[lo] * (1.0 - frac) + samples[hi] * frac + } + return [ + min: samples[0], + p25: pct(0.25), + median: samples[(int)(n / 2)], + p75: pct(0.75), + max: samples[n - 1], + ] + } + + // Compare V2 and V4 on the same query. Alternate which format runs + // first so V4 doesn't always inherit V2's warmed-up state. + def compareQuery = { tag, predicate -> + // Interleaved iterations — fold the warmup-discard + percentile + // calc on each format's samples separately. + def v2Samples = [] + def v4Samples = [] + def kIterations = 12 + for (int i = 0; i < kIterations; i++) { + if ((i & 1) == 0) { + def t0 = System.nanoTime() + sql "SELECT id FROM ${v2Table} WHERE ${predicate} ORDER BY id" + v2Samples.add((System.nanoTime() - t0) / 1000.0) + def t1 = System.nanoTime() + sql "SELECT id FROM ${v4Table} WHERE ${predicate} ORDER BY id" + v4Samples.add((System.nanoTime() - t1) / 1000.0) + } else { + def t1 = System.nanoTime() + sql "SELECT id FROM ${v4Table} WHERE ${predicate} ORDER BY id" + v4Samples.add((System.nanoTime() - t1) / 1000.0) + def t0 = System.nanoTime() + sql "SELECT id FROM ${v2Table} WHERE ${predicate} ORDER BY id" + v2Samples.add((System.nanoTime() - t0) / 1000.0) + } + } + def summarize = { raw -> + def s = raw.drop(3).sort() + def n = s.size() + return [min: s[0], median: s[(int)(n / 2)], max: s[n - 1]] + } + def v2 = summarize(v2Samples) + def v4 = summarize(v4Samples) + def ratio = v4.median / v2.median + log.info("[${tag}] V2 min/median/max = ${v2.min}/${v2.median}/${v2.max} us; " + + "V4 = ${v4.min}/${v4.median}/${v4.max} us; ratio(median) ${ratio}") + + // Caps loose at 2.0 because cluster-side timing carries planner, + // executor, network round-trip, and BE thread-scheduling noise + // on top of the actual reader latency. A real reader regression + // would shift the median far past this cap. + assertTrue(ratio < 2.0, + "${tag}: V4 median ${v4.median} us / V2 median ${v2.median} us = ${ratio} " + Review Comment: A p0 regression suite should not fail on wall-clock query timing. This median ratio includes planner/executor startup, cache state, BE scheduling, network/runner noise, and unrelated concurrent load; on shared CI it can exceed 2x without a functional regression, especially with only 9 retained samples. This will create flaky failures for unrelated changes. Please keep this as logging/manual benchmark coverage, move it out of p0/per-PR execution, or replace the assertion with deterministic functional checks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
