This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-vector-index.git


The following commit(s) were added to refs/heads/main by this push:
     new 3473e60  Reuse batch reader probed lists (#14)
3473e60 is described below

commit 3473e6056954b40a820555befe98eeb90293be26
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 8 19:20:27 2026 +0800

    Reuse batch reader probed lists (#14)
---
 core/src/ivfpq.rs | 162 ++++++++++++++++++++++++++----------------------------
 1 file changed, 79 insertions(+), 83 deletions(-)

diff --git a/core/src/ivfpq.rs b/core/src/ivfpq.rs
index 093997d..e9c0974 100644
--- a/core/src/ivfpq.rs
+++ b/core/src/ivfpq.rs
@@ -23,7 +23,7 @@ use crate::kmeans::{self, KMeansConfig};
 use crate::opq::OPQMatrix;
 use crate::pq::ProductQuantizer;
 use rayon::prelude::*;
-use std::collections::HashSet;
+use std::collections::{HashMap, HashSet};
 use std::io;
 
 /// IVF-PQ index aligned with Faiss's IndexIVFPQ.
@@ -1173,12 +1173,17 @@ pub fn search_batch_reader<R: SeekRead>(
         nprobe,
     );
 
-    // Step 3: Group (query_idx, probe_rank) pairs by list_id
-    let mut list_to_queries: Vec<Vec<(usize, f32)>> = vec![Vec::new(); 
reader.nlist];
+    // Step 3: Group (query_idx, probe_rank) pairs by probed list_id only.
+    let mut list_to_queries: HashMap<usize, Vec<(usize, f32)>> = 
HashMap::new();
+    let mut unique_lists = Vec::new();
     for qi in 0..nq {
         for (rank, &list_id) in all_probe_indices[qi].iter().enumerate() {
             let coarse_dist = all_coarse_dists[qi][rank];
-            list_to_queries[list_id].push((qi, coarse_dist));
+            let entry = list_to_queries.entry(list_id).or_insert_with(|| {
+                unique_lists.push(list_id);
+                Vec::new()
+            });
+            entry.push((qi, coarse_dist));
         }
     }
 
@@ -1202,10 +1207,7 @@ pub fn search_batch_reader<R: SeekRead>(
 
     let mut heaps: Vec<TopKHeap> = (0..nq).map(|_| TopKHeap::new(k)).collect();
 
-    for list_id in 0..reader.nlist {
-        if list_to_queries[list_id].is_empty() {
-            continue;
-        }
+    for list_id in unique_lists {
         let count = reader.list_counts[list_id] as usize;
         if count == 0 {
             continue;
@@ -1213,84 +1215,38 @@ pub fn search_batch_reader<R: SeekRead>(
 
         // Read list once (shared across all queries that probe it)
         let (ids, codes) = reader.read_inverted_list(list_id)?;
+        let mut entry = PreReadList {
+            list_id,
+            count,
+            dis0: 0.0,
+            ids,
+            codes,
+        };
 
-        for &(qi, coarse_dist) in &list_to_queries[list_id] {
+        for &(qi, coarse_dist) in &list_to_queries[&list_id] {
             let query = &processed[qi * d..(qi + 1) * d];
-
-            let mut sim_table = vec![0.0f32; m * ksub];
-            if use_precomputed {
-                let tab_base = list_id * m * ksub;
-                fvec_madd(
-                    &reader.precomputed_table[tab_base..tab_base + m * ksub],
-                    &all_ip_tables[qi],
-                    -2.0,
-                    &mut sim_table,
-                );
-            } else if by_residual {
-                let mut residual_query = vec![0.0f32; d];
-                for j in 0..d {
-                    residual_query[j] = query[j] - 
reader.quantizer_centroids[list_id * d + j];
-                }
-                reader
-                    .pq
-                    .compute_distance_table(&residual_query, metric, &mut 
sim_table);
-            } else {
-                reader
-                    .pq
-                    .compute_distance_table(query, metric, &mut sim_table);
-            }
-
             let dis0 = if use_precomputed { coarse_dist } else { 0.0 };
-
-            let is_4bit = reader.pq.nbits == 4;
-            if is_4bit && reader.transposed_codes {
-                scan_codes_4bit_transposed(
-                    &sim_table,
-                    &codes,
-                    &ids,
-                    count,
-                    m,
-                    dis0,
-                    None,
-                    &mut heaps[qi],
-                );
-            } else if is_4bit {
-                scan_codes_4bit(
-                    &sim_table,
-                    &codes,
-                    &ids,
-                    count,
-                    m,
-                    ksub,
-                    dis0,
-                    None,
-                    &mut heaps[qi],
-                );
-            } else if reader.transposed_codes {
-                scan_codes_transposed(
-                    &sim_table,
-                    &codes,
-                    &ids,
-                    count,
-                    m,
-                    ksub,
-                    dis0,
-                    None,
-                    &mut heaps[qi],
-                );
-            } else {
-                scan_codes_batched(
-                    &sim_table,
-                    &codes,
-                    &ids,
-                    count,
-                    m,
-                    ksub,
-                    dis0,
-                    None,
-                    &mut heaps[qi],
-                );
-            }
+            let ctx = ReaderSearchContext {
+                q: query,
+                ip_table: if use_precomputed {
+                    &all_ip_tables[qi]
+                } else {
+                    &[]
+                },
+                use_precomputed,
+                filter: None,
+                d,
+                m,
+                ksub,
+                metric,
+                by_residual,
+                transposed_codes: reader.transposed_codes,
+                pq: &reader.pq,
+                quantizer_centroids: &reader.quantizer_centroids,
+                precomputed_table: &reader.precomputed_table,
+            };
+            entry.dis0 = dis0;
+            scan_reader_list(&entry, &ctx, &mut heaps[qi]);
         }
     }
 
@@ -2073,6 +2029,46 @@ mod tests {
         }
     }
 
+    #[test]
+    fn test_batch_reader_matches_single_reader_search() {
+        use crate::io::{write_index, IVFPQIndexReader, PosWriter};
+        use std::io::Cursor;
+
+        let d = 16;
+        let nlist = 8;
+        let m = 4;
+        let n = 1000;
+        let k = 5;
+        let nq = 12;
+        let nprobe = 3;
+
+        let data = generate_clustered_data(n, d, 8, 42);
+        let ids: Vec<i64> = (0..n as i64).collect();
+
+        let mut index = IVFPQIndex::new(d, nlist, m, MetricType::L2, false);
+        index.train(&data, n);
+        index.add(&data, &ids, n);
+
+        let mut buf = Vec::new();
+        let mut writer = PosWriter::new(&mut buf);
+        write_index(&index, &mut writer).unwrap();
+
+        let queries = &data[..nq * d];
+        let mut batch_reader = 
IVFPQIndexReader::open(Cursor::new(buf.clone())).unwrap();
+        let (batch_ids, batch_dists) =
+            search_batch_reader(&mut batch_reader, queries, nq, k, 
nprobe).unwrap();
+
+        for qi in 0..nq {
+            let mut single_reader = 
IVFPQIndexReader::open(Cursor::new(buf.clone())).unwrap();
+            let query = &queries[qi * d..(qi + 1) * d];
+            let (single_ids, single_dists) = single_reader.search(query, k, 
nprobe).unwrap();
+            let base = qi * k;
+
+            assert_eq!(&batch_ids[base..base + k], &single_ids[..]);
+            assert_eq!(&batch_dists[base..base + k], &single_dists[..]);
+        }
+    }
+
     #[test]
     fn test_batch_reader_validates_inputs() {
         use crate::io::{write_index, IVFPQIndexReader, PosWriter};

Reply via email to