This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-vector-index.git
The following commit(s) were added to refs/heads/main by this push:
new 3473e60 Reuse batch reader probed lists (#14)
3473e60 is described below
commit 3473e6056954b40a820555befe98eeb90293be26
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 8 19:20:27 2026 +0800
Reuse batch reader probed lists (#14)
---
core/src/ivfpq.rs | 162 ++++++++++++++++++++++++++----------------------------
1 file changed, 79 insertions(+), 83 deletions(-)
diff --git a/core/src/ivfpq.rs b/core/src/ivfpq.rs
index 093997d..e9c0974 100644
--- a/core/src/ivfpq.rs
+++ b/core/src/ivfpq.rs
@@ -23,7 +23,7 @@ use crate::kmeans::{self, KMeansConfig};
use crate::opq::OPQMatrix;
use crate::pq::ProductQuantizer;
use rayon::prelude::*;
-use std::collections::HashSet;
+use std::collections::{HashMap, HashSet};
use std::io;
/// IVF-PQ index aligned with Faiss's IndexIVFPQ.
@@ -1173,12 +1173,17 @@ pub fn search_batch_reader<R: SeekRead>(
nprobe,
);
- // Step 3: Group (query_idx, probe_rank) pairs by list_id
- let mut list_to_queries: Vec<Vec<(usize, f32)>> = vec![Vec::new();
reader.nlist];
+ // Step 3: Group (query_idx, probe_rank) pairs by probed list_id only.
+ let mut list_to_queries: HashMap<usize, Vec<(usize, f32)>> =
HashMap::new();
+ let mut unique_lists = Vec::new();
for qi in 0..nq {
for (rank, &list_id) in all_probe_indices[qi].iter().enumerate() {
let coarse_dist = all_coarse_dists[qi][rank];
- list_to_queries[list_id].push((qi, coarse_dist));
+ let entry = list_to_queries.entry(list_id).or_insert_with(|| {
+ unique_lists.push(list_id);
+ Vec::new()
+ });
+ entry.push((qi, coarse_dist));
}
}
@@ -1202,10 +1207,7 @@ pub fn search_batch_reader<R: SeekRead>(
let mut heaps: Vec<TopKHeap> = (0..nq).map(|_| TopKHeap::new(k)).collect();
- for list_id in 0..reader.nlist {
- if list_to_queries[list_id].is_empty() {
- continue;
- }
+ for list_id in unique_lists {
let count = reader.list_counts[list_id] as usize;
if count == 0 {
continue;
@@ -1213,84 +1215,38 @@ pub fn search_batch_reader<R: SeekRead>(
// Read list once (shared across all queries that probe it)
let (ids, codes) = reader.read_inverted_list(list_id)?;
+ let mut entry = PreReadList {
+ list_id,
+ count,
+ dis0: 0.0,
+ ids,
+ codes,
+ };
- for &(qi, coarse_dist) in &list_to_queries[list_id] {
+ for &(qi, coarse_dist) in &list_to_queries[&list_id] {
let query = &processed[qi * d..(qi + 1) * d];
-
- let mut sim_table = vec![0.0f32; m * ksub];
- if use_precomputed {
- let tab_base = list_id * m * ksub;
- fvec_madd(
- &reader.precomputed_table[tab_base..tab_base + m * ksub],
- &all_ip_tables[qi],
- -2.0,
- &mut sim_table,
- );
- } else if by_residual {
- let mut residual_query = vec![0.0f32; d];
- for j in 0..d {
- residual_query[j] = query[j] -
reader.quantizer_centroids[list_id * d + j];
- }
- reader
- .pq
- .compute_distance_table(&residual_query, metric, &mut
sim_table);
- } else {
- reader
- .pq
- .compute_distance_table(query, metric, &mut sim_table);
- }
-
let dis0 = if use_precomputed { coarse_dist } else { 0.0 };
-
- let is_4bit = reader.pq.nbits == 4;
- if is_4bit && reader.transposed_codes {
- scan_codes_4bit_transposed(
- &sim_table,
- &codes,
- &ids,
- count,
- m,
- dis0,
- None,
- &mut heaps[qi],
- );
- } else if is_4bit {
- scan_codes_4bit(
- &sim_table,
- &codes,
- &ids,
- count,
- m,
- ksub,
- dis0,
- None,
- &mut heaps[qi],
- );
- } else if reader.transposed_codes {
- scan_codes_transposed(
- &sim_table,
- &codes,
- &ids,
- count,
- m,
- ksub,
- dis0,
- None,
- &mut heaps[qi],
- );
- } else {
- scan_codes_batched(
- &sim_table,
- &codes,
- &ids,
- count,
- m,
- ksub,
- dis0,
- None,
- &mut heaps[qi],
- );
- }
+ let ctx = ReaderSearchContext {
+ q: query,
+ ip_table: if use_precomputed {
+ &all_ip_tables[qi]
+ } else {
+ &[]
+ },
+ use_precomputed,
+ filter: None,
+ d,
+ m,
+ ksub,
+ metric,
+ by_residual,
+ transposed_codes: reader.transposed_codes,
+ pq: &reader.pq,
+ quantizer_centroids: &reader.quantizer_centroids,
+ precomputed_table: &reader.precomputed_table,
+ };
+ entry.dis0 = dis0;
+ scan_reader_list(&entry, &ctx, &mut heaps[qi]);
}
}
@@ -2073,6 +2029,46 @@ mod tests {
}
}
+ #[test]
+ fn test_batch_reader_matches_single_reader_search() {
+ use crate::io::{write_index, IVFPQIndexReader, PosWriter};
+ use std::io::Cursor;
+
+ let d = 16;
+ let nlist = 8;
+ let m = 4;
+ let n = 1000;
+ let k = 5;
+ let nq = 12;
+ let nprobe = 3;
+
+ let data = generate_clustered_data(n, d, 8, 42);
+ let ids: Vec<i64> = (0..n as i64).collect();
+
+ let mut index = IVFPQIndex::new(d, nlist, m, MetricType::L2, false);
+ index.train(&data, n);
+ index.add(&data, &ids, n);
+
+ let mut buf = Vec::new();
+ let mut writer = PosWriter::new(&mut buf);
+ write_index(&index, &mut writer).unwrap();
+
+ let queries = &data[..nq * d];
+ let mut batch_reader =
IVFPQIndexReader::open(Cursor::new(buf.clone())).unwrap();
+ let (batch_ids, batch_dists) =
+ search_batch_reader(&mut batch_reader, queries, nq, k,
nprobe).unwrap();
+
+ for qi in 0..nq {
+ let mut single_reader =
IVFPQIndexReader::open(Cursor::new(buf.clone())).unwrap();
+ let query = &queries[qi * d..(qi + 1) * d];
+ let (single_ids, single_dists) = single_reader.search(query, k,
nprobe).unwrap();
+ let base = qi * k;
+
+ assert_eq!(&batch_ids[base..base + k], &single_ids[..]);
+ assert_eq!(&batch_dists[base..base + k], &single_dists[..]);
+ }
+ }
+
#[test]
fn test_batch_reader_validates_inputs() {
use crate::io::{write_index, IVFPQIndexReader, PosWriter};