This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-vector-index.git


The following commit(s) were added to refs/heads/main by this push:
     new cf73d26  Stream reader search for non-concurrent pread (#12)
cf73d26 is described below

commit cf73d26a6c326d9997716304c197bb295d758077
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 8 18:59:20 2026 +0800

    Stream reader search for non-concurrent pread (#12)
---
 core/src/io.rs    |   4 +
 core/src/ivfpq.rs | 426 ++++++++++++++++++++++++++++++++++++++++--------------
 2 files changed, 324 insertions(+), 106 deletions(-)

diff --git a/core/src/io.rs b/core/src/io.rs
index eb53db9..bf1997d 100644
--- a/core/src/io.rs
+++ b/core/src/io.rs
@@ -798,6 +798,10 @@ impl<R: SeekRead> IVFPQIndexReader<R> {
         self.ensure_loaded()?;
         crate::ivfpq::search_with_reader(self, query, k, nprobe)
     }
+
+    pub fn supports_concurrent_pread(&self) -> bool {
+        self.reader.supports_concurrent_pread()
+    }
 }
 
 #[allow(dead_code)]
diff --git a/core/src/ivfpq.rs b/core/src/ivfpq.rs
index 20ef044..9a1ec7e 100644
--- a/core/src/ivfpq.rs
+++ b/core/src/ivfpq.rs
@@ -835,6 +835,22 @@ struct PreReadList {
     codes: Vec<u8>,
 }
 
+struct ReaderSearchContext<'a> {
+    q: &'a [f32],
+    ip_table: &'a [f32],
+    use_precomputed: bool,
+    filter: Option<&'a HashSet<i64>>,
+    d: usize,
+    m: usize,
+    ksub: usize,
+    metric: MetricType,
+    by_residual: bool,
+    transposed_codes: bool,
+    pq: &'a crate::pq::ProductQuantizer,
+    quantizer_centroids: &'a [f32],
+    precomputed_table: &'a [f32],
+}
+
 /// Search using a lazy reader (reads inverted lists on demand).
 pub fn search_with_reader<R: SeekRead>(
     reader: &mut IVFPQIndexReader<R>,
@@ -855,6 +871,29 @@ pub fn search_with_reader_filter<R: SeekRead>(
 ) -> io::Result<(Vec<i64>, Vec<f32>)> {
     reader.ensure_loaded()?;
     let d = reader.d;
+    if query.len() != d {
+        return Err(io::Error::new(
+            io::ErrorKind::InvalidInput,
+            format!(
+                "query length {} does not match index dimension {}",
+                query.len(),
+                d
+            ),
+        ));
+    }
+    if k == 0 {
+        return Err(io::Error::new(
+            io::ErrorKind::InvalidInput,
+            "k must be greater than 0",
+        ));
+    }
+    if nprobe == 0 {
+        return Err(io::Error::new(
+            io::ErrorKind::InvalidInput,
+            "nprobe must be greater than 0",
+        ));
+    }
+
     let m = reader.m;
     let ksub = reader.ksub;
     let metric = reader.metric;
@@ -884,115 +923,95 @@ pub fn search_with_reader_filter<R: SeekRead>(
         Vec::new()
     };
 
-    // Pre-read all inverted lists upfront so we can scan in parallel
-    let mut list_data: Vec<PreReadList> = Vec::new();
-    for (probe_idx, &list_id) in probe_indices.iter().enumerate() {
-        let count = reader.list_counts[list_id] as usize;
-        if count == 0 {
-            continue;
-        }
-        let dis0 = if use_precomputed {
-            coarse_dists[probe_idx]
-        } else {
-            0.0
-        };
-        let (ids, codes) = reader.read_inverted_list(list_id)?;
-        list_data.push(PreReadList {
-            list_id,
-            count,
-            dis0,
-            ids,
-            codes,
-        });
-    }
-
-    // Parallel scan across pre-read inverted lists
-    let per_list_results: Vec<Vec<(f32, i64)>> = list_data
-        .par_iter()
-        .map(|entry| {
-            let mut sim_table = vec![0.0f32; m * ksub];
+    let mut heap = TopKHeap::new(k);
 
-            if use_precomputed {
-                let tab_base = entry.list_id * m * ksub;
-                fvec_madd(
-                    &reader.precomputed_table[tab_base..tab_base + m * ksub],
-                    &ip_table,
-                    -2.0,
-                    &mut sim_table,
-                );
-            } else if by_residual {
-                let mut residual_query = vec![0.0f32; d];
-                for j in 0..d {
-                    residual_query[j] = q[j] - 
reader.quantizer_centroids[entry.list_id * d + j];
-                }
-                reader
-                    .pq
-                    .compute_distance_table(&residual_query, metric, &mut 
sim_table);
-            } else {
-                reader.pq.compute_distance_table(&q, metric, &mut sim_table);
+    if reader.supports_concurrent_pread() {
+        // Pre-read all inverted lists upfront so we can scan them in parallel.
+        let mut list_data: Vec<PreReadList> = Vec::new();
+        for (probe_idx, &list_id) in probe_indices.iter().enumerate() {
+            let count = reader.list_counts[list_id] as usize;
+            if count == 0 {
+                continue;
             }
+            let dis0 = if use_precomputed {
+                coarse_dists[probe_idx]
+            } else {
+                0.0
+            };
+            let (ids, codes) = reader.read_inverted_list(list_id)?;
+            list_data.push(PreReadList {
+                list_id,
+                count,
+                dis0,
+                ids,
+                codes,
+            });
+        }
 
-            let mut local_heap = TopKHeap::new(k);
-            let use_transposed = reader.transposed_codes;
-            let is_4bit = reader.pq.nbits == 4;
+        let ctx = ReaderSearchContext {
+            q: &q,
+            ip_table: &ip_table,
+            use_precomputed,
+            filter,
+            d,
+            m,
+            ksub,
+            metric,
+            by_residual,
+            transposed_codes: reader.transposed_codes,
+            pq: &reader.pq,
+            quantizer_centroids: &reader.quantizer_centroids,
+            precomputed_table: &reader.precomputed_table,
+        };
+        let per_list_results: Vec<Vec<(f32, i64)>> = list_data
+            .par_iter()
+            .map(|entry| {
+                let mut local_heap = TopKHeap::new(k);
+                scan_reader_list(entry, &ctx, &mut local_heap);
+                local_heap.into_sorted()
+            })
+            .collect();
 
-            if is_4bit && use_transposed {
-                scan_codes_4bit_transposed(
-                    &sim_table,
-                    &entry.codes,
-                    &entry.ids,
-                    entry.count,
-                    m,
-                    entry.dis0,
-                    filter,
-                    &mut local_heap,
-                );
-            } else if is_4bit {
-                scan_codes_4bit(
-                    &sim_table,
-                    &entry.codes,
-                    &entry.ids,
-                    entry.count,
-                    m,
-                    ksub,
-                    entry.dis0,
-                    filter,
-                    &mut local_heap,
-                );
-            } else if use_transposed {
-                scan_codes_transposed(
-                    &sim_table,
-                    &entry.codes,
-                    &entry.ids,
-                    entry.count,
-                    m,
-                    ksub,
-                    entry.dis0,
-                    filter,
-                    &mut local_heap,
-                );
-            } else {
-                scan_codes_batched(
-                    &sim_table,
-                    &entry.codes,
-                    &entry.ids,
-                    entry.count,
-                    m,
-                    ksub,
-                    entry.dis0,
-                    filter,
-                    &mut local_heap,
-                );
+        for results in per_list_results {
+            for (dist, id) in results {
+                heap.push(dist, id);
             }
-            local_heap.into_sorted()
-        })
-        .collect();
-
-    // Merge per-list heaps
-    let mut heap = TopKHeap::new(k);
-    for results in per_list_results {
-        for (dist, id) in results {
-            heap.push(dist, id);
+        }
+    } else {
+        for (probe_idx, &list_id) in probe_indices.iter().enumerate() {
+            let count = reader.list_counts[list_id] as usize;
+            if count == 0 {
+                continue;
+            }
+            let dis0 = if use_precomputed {
+                coarse_dists[probe_idx]
+            } else {
+                0.0
+            };
+            let (ids, codes) = reader.read_inverted_list(list_id)?;
+            let entry = PreReadList {
+                list_id,
+                count,
+                dis0,
+                ids,
+                codes,
+            };
+            let ctx = ReaderSearchContext {
+                q: &q,
+                ip_table: &ip_table,
+                use_precomputed,
+                filter,
+                d,
+                m,
+                ksub,
+                metric,
+                by_residual,
+                transposed_codes: reader.transposed_codes,
+                pq: &reader.pq,
+                quantizer_centroids: &reader.quantizer_centroids,
+                precomputed_table: &reader.precomputed_table,
+            };
+            scan_reader_list(&entry, &ctx, &mut heap);
         }
     }
 
@@ -1003,6 +1022,83 @@ pub fn search_with_reader_filter<R: SeekRead>(
     Ok((result_ids, result_dists))
 }
 
+fn scan_reader_list(entry: &PreReadList, ctx: &ReaderSearchContext<'_>, heap: 
&mut TopKHeap) {
+    let d = ctx.d;
+    let m = ctx.m;
+    let ksub = ctx.ksub;
+    let metric = ctx.metric;
+    let mut sim_table = vec![0.0f32; m * ksub];
+
+    if ctx.use_precomputed {
+        let tab_base = entry.list_id * m * ksub;
+        fvec_madd(
+            &ctx.precomputed_table[tab_base..tab_base + m * ksub],
+            ctx.ip_table,
+            -2.0,
+            &mut sim_table,
+        );
+    } else if ctx.by_residual {
+        let mut residual_query = vec![0.0f32; d];
+        for j in 0..d {
+            residual_query[j] = ctx.q[j] - 
ctx.quantizer_centroids[entry.list_id * d + j];
+        }
+        ctx.pq
+            .compute_distance_table(&residual_query, metric, &mut sim_table);
+    } else {
+        ctx.pq.compute_distance_table(ctx.q, metric, &mut sim_table);
+    }
+
+    let is_4bit = ctx.pq.nbits == 4;
+    if is_4bit && ctx.transposed_codes {
+        scan_codes_4bit_transposed(
+            &sim_table,
+            &entry.codes,
+            &entry.ids,
+            entry.count,
+            m,
+            entry.dis0,
+            ctx.filter,
+            heap,
+        );
+    } else if is_4bit {
+        scan_codes_4bit(
+            &sim_table,
+            &entry.codes,
+            &entry.ids,
+            entry.count,
+            m,
+            ksub,
+            entry.dis0,
+            ctx.filter,
+            heap,
+        );
+    } else if ctx.transposed_codes {
+        scan_codes_transposed(
+            &sim_table,
+            &entry.codes,
+            &entry.ids,
+            entry.count,
+            m,
+            ksub,
+            entry.dis0,
+            ctx.filter,
+            heap,
+        );
+    } else {
+        scan_codes_batched(
+            &sim_table,
+            &entry.codes,
+            &entry.ids,
+            entry.count,
+            m,
+            ksub,
+            entry.dis0,
+            ctx.filter,
+            heap,
+        );
+    }
+}
+
 /// Big batch search: batch queries share list reads.
 /// Instead of nq*nprobe I/O ops, reads each unique list once and scans for 
all queries.
 pub fn search_batch_reader<R: SeekRead>(
@@ -1269,8 +1365,52 @@ fn sift_down(heap: &mut [(f32, i64)], mut i: usize) {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::io::SeekRead;
     use rand::rngs::StdRng;
     use rand::{Rng, SeedableRng};
+    use std::io::Cursor;
+    use std::sync::{Arc, Mutex};
+
+    #[derive(Default)]
+    struct ReaderStats {
+        pread_calls: usize,
+        max_pread_len: usize,
+    }
+
+    struct NonConcurrentPreadCursor {
+        inner: Cursor<Vec<u8>>,
+        stats: Arc<Mutex<ReaderStats>>,
+    }
+
+    impl NonConcurrentPreadCursor {
+        fn new(data: Vec<u8>, stats: Arc<Mutex<ReaderStats>>) -> Self {
+            NonConcurrentPreadCursor {
+                inner: Cursor::new(data),
+                stats,
+            }
+        }
+    }
+
+    impl SeekRead for NonConcurrentPreadCursor {
+        fn seek(&mut self, pos: u64) -> io::Result<()> {
+            io::Seek::seek(&mut self.inner, io::SeekFrom::Start(pos))?;
+            Ok(())
+        }
+
+        fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
+            io::Read::read_exact(&mut self.inner, buf)
+        }
+
+        fn pread(&mut self, pos: u64, buf: &mut [u8]) -> io::Result<()> {
+            {
+                let mut stats = self.stats.lock().unwrap();
+                stats.pread_calls += 1;
+                stats.max_pread_len = stats.max_pread_len.max(buf.len());
+            }
+            io::Seek::seek(&mut self.inner, io::SeekFrom::Start(pos))?;
+            io::Read::read_exact(&mut self.inner, buf)
+        }
+    }
 
     fn generate_clustered_data(n: usize, d: usize, num_clusters: usize, seed: 
u64) -> Vec<f32> {
         let mut rng = StdRng::seed_from_u64(seed);
@@ -1717,7 +1857,6 @@ mod tests {
     #[test]
     fn test_write_read_search() {
         use crate::io::{write_index, IVFPQIndexReader, PosWriter};
-        use std::io::Cursor;
 
         let d = 16;
         let nlist = 4;
@@ -1748,10 +1887,85 @@ mod tests {
         }
     }
 
+    #[test]
+    fn test_reader_search_works_without_concurrent_pread() {
+        use crate::io::{write_index, IVFPQIndexReader, PosWriter};
+
+        let d = 16;
+        let nlist = 8;
+        let m = 4;
+        let n = 800;
+        let k = 5;
+        let nprobe = 4;
+
+        let data = generate_clustered_data(n, d, 8, 789);
+        let ids: Vec<i64> = (0..n as i64).collect();
+
+        let mut index = IVFPQIndex::new(d, nlist, m, MetricType::L2, false);
+        index.train(&data, n);
+        index.add(&data, &ids, n);
+
+        let mut buf = Vec::new();
+        let mut writer = PosWriter::new(&mut buf);
+        write_index(&index, &mut writer).unwrap();
+
+        let mut baseline_reader = 
IVFPQIndexReader::open(Cursor::new(buf.clone())).unwrap();
+        let (baseline_ids, baseline_dists) =
+            baseline_reader.search(&data[0..d], k, nprobe).unwrap();
+
+        let stats = Arc::new(Mutex::new(ReaderStats::default()));
+        let stream = NonConcurrentPreadCursor::new(buf, Arc::clone(&stats));
+        let mut reader = IVFPQIndexReader::open(stream).unwrap();
+        assert!(!reader.supports_concurrent_pread());
+
+        let (ids, dists) = reader.search(&data[0..d], k, nprobe).unwrap();
+
+        assert_eq!(ids, baseline_ids);
+        assert_eq!(dists, baseline_dists);
+        assert!(
+            stats.lock().unwrap().pread_calls > 0,
+            "search should still read inverted lists through pread fallback"
+        );
+    }
+
+    #[test]
+    fn test_reader_search_validates_inputs() {
+        use crate::io::{write_index, IVFPQIndexReader, PosWriter};
+
+        let d = 16;
+        let nlist = 4;
+        let m = 4;
+        let n = 500;
+
+        let data = generate_clustered_data(n, d, 4, 789);
+        let ids: Vec<i64> = (0..n as i64).collect();
+
+        let mut index = IVFPQIndex::new(d, nlist, m, MetricType::L2, false);
+        index.train(&data, n);
+        index.add(&data, &ids, n);
+
+        let mut buf = Vec::new();
+        let mut writer = PosWriter::new(&mut buf);
+        write_index(&index, &mut writer).unwrap();
+
+        let mut reader = IVFPQIndexReader::open(Cursor::new(buf)).unwrap();
+
+        let err = reader.search(&data[0..d - 1], 5, 2).unwrap_err();
+        assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
+
+        let err = reader.search(&data[0..d + 1], 5, 2).unwrap_err();
+        assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
+
+        let err = reader.search(&data[0..d], 0, 2).unwrap_err();
+        assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
+
+        let err = reader.search(&data[0..d], 5, 0).unwrap_err();
+        assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
+    }
+
     #[test]
     fn test_write_read_search_with_filter() {
         use crate::io::{write_index, IVFPQIndexReader, PosWriter};
-        use std::io::Cursor;
 
         let d = 16;
         let nlist = 4;

Reply via email to