This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-vector-index.git
The following commit(s) were added to refs/heads/main by this push:
new cf73d26 Stream reader search for non-concurrent pread (#12)
cf73d26 is described below
commit cf73d26a6c326d9997716304c197bb295d758077
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 8 18:59:20 2026 +0800
Stream reader search for non-concurrent pread (#12)
---
core/src/io.rs | 4 +
core/src/ivfpq.rs | 426 ++++++++++++++++++++++++++++++++++++++++--------------
2 files changed, 324 insertions(+), 106 deletions(-)
diff --git a/core/src/io.rs b/core/src/io.rs
index eb53db9..bf1997d 100644
--- a/core/src/io.rs
+++ b/core/src/io.rs
@@ -798,6 +798,10 @@ impl<R: SeekRead> IVFPQIndexReader<R> {
self.ensure_loaded()?;
crate::ivfpq::search_with_reader(self, query, k, nprobe)
}
+
+ pub fn supports_concurrent_pread(&self) -> bool {
+ self.reader.supports_concurrent_pread()
+ }
}
#[allow(dead_code)]
diff --git a/core/src/ivfpq.rs b/core/src/ivfpq.rs
index 20ef044..9a1ec7e 100644
--- a/core/src/ivfpq.rs
+++ b/core/src/ivfpq.rs
@@ -835,6 +835,22 @@ struct PreReadList {
codes: Vec<u8>,
}
+struct ReaderSearchContext<'a> {
+ q: &'a [f32],
+ ip_table: &'a [f32],
+ use_precomputed: bool,
+ filter: Option<&'a HashSet<i64>>,
+ d: usize,
+ m: usize,
+ ksub: usize,
+ metric: MetricType,
+ by_residual: bool,
+ transposed_codes: bool,
+ pq: &'a crate::pq::ProductQuantizer,
+ quantizer_centroids: &'a [f32],
+ precomputed_table: &'a [f32],
+}
+
/// Search using a lazy reader (reads inverted lists on demand).
pub fn search_with_reader<R: SeekRead>(
reader: &mut IVFPQIndexReader<R>,
@@ -855,6 +871,29 @@ pub fn search_with_reader_filter<R: SeekRead>(
) -> io::Result<(Vec<i64>, Vec<f32>)> {
reader.ensure_loaded()?;
let d = reader.d;
+ if query.len() != d {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ format!(
+ "query length {} does not match index dimension {}",
+ query.len(),
+ d
+ ),
+ ));
+ }
+ if k == 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "k must be greater than 0",
+ ));
+ }
+ if nprobe == 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "nprobe must be greater than 0",
+ ));
+ }
+
let m = reader.m;
let ksub = reader.ksub;
let metric = reader.metric;
@@ -884,115 +923,95 @@ pub fn search_with_reader_filter<R: SeekRead>(
Vec::new()
};
- // Pre-read all inverted lists upfront so we can scan in parallel
- let mut list_data: Vec<PreReadList> = Vec::new();
- for (probe_idx, &list_id) in probe_indices.iter().enumerate() {
- let count = reader.list_counts[list_id] as usize;
- if count == 0 {
- continue;
- }
- let dis0 = if use_precomputed {
- coarse_dists[probe_idx]
- } else {
- 0.0
- };
- let (ids, codes) = reader.read_inverted_list(list_id)?;
- list_data.push(PreReadList {
- list_id,
- count,
- dis0,
- ids,
- codes,
- });
- }
-
- // Parallel scan across pre-read inverted lists
- let per_list_results: Vec<Vec<(f32, i64)>> = list_data
- .par_iter()
- .map(|entry| {
- let mut sim_table = vec![0.0f32; m * ksub];
+ let mut heap = TopKHeap::new(k);
- if use_precomputed {
- let tab_base = entry.list_id * m * ksub;
- fvec_madd(
- &reader.precomputed_table[tab_base..tab_base + m * ksub],
- &ip_table,
- -2.0,
- &mut sim_table,
- );
- } else if by_residual {
- let mut residual_query = vec![0.0f32; d];
- for j in 0..d {
- residual_query[j] = q[j] -
reader.quantizer_centroids[entry.list_id * d + j];
- }
- reader
- .pq
- .compute_distance_table(&residual_query, metric, &mut
sim_table);
- } else {
- reader.pq.compute_distance_table(&q, metric, &mut sim_table);
+ if reader.supports_concurrent_pread() {
+ // Pre-read all inverted lists upfront so we can scan them in parallel.
+ let mut list_data: Vec<PreReadList> = Vec::new();
+ for (probe_idx, &list_id) in probe_indices.iter().enumerate() {
+ let count = reader.list_counts[list_id] as usize;
+ if count == 0 {
+ continue;
}
+ let dis0 = if use_precomputed {
+ coarse_dists[probe_idx]
+ } else {
+ 0.0
+ };
+ let (ids, codes) = reader.read_inverted_list(list_id)?;
+ list_data.push(PreReadList {
+ list_id,
+ count,
+ dis0,
+ ids,
+ codes,
+ });
+ }
- let mut local_heap = TopKHeap::new(k);
- let use_transposed = reader.transposed_codes;
- let is_4bit = reader.pq.nbits == 4;
+ let ctx = ReaderSearchContext {
+ q: &q,
+ ip_table: &ip_table,
+ use_precomputed,
+ filter,
+ d,
+ m,
+ ksub,
+ metric,
+ by_residual,
+ transposed_codes: reader.transposed_codes,
+ pq: &reader.pq,
+ quantizer_centroids: &reader.quantizer_centroids,
+ precomputed_table: &reader.precomputed_table,
+ };
+ let per_list_results: Vec<Vec<(f32, i64)>> = list_data
+ .par_iter()
+ .map(|entry| {
+ let mut local_heap = TopKHeap::new(k);
+ scan_reader_list(entry, &ctx, &mut local_heap);
+ local_heap.into_sorted()
+ })
+ .collect();
- if is_4bit && use_transposed {
- scan_codes_4bit_transposed(
- &sim_table,
- &entry.codes,
- &entry.ids,
- entry.count,
- m,
- entry.dis0,
- filter,
- &mut local_heap,
- );
- } else if is_4bit {
- scan_codes_4bit(
- &sim_table,
- &entry.codes,
- &entry.ids,
- entry.count,
- m,
- ksub,
- entry.dis0,
- filter,
- &mut local_heap,
- );
- } else if use_transposed {
- scan_codes_transposed(
- &sim_table,
- &entry.codes,
- &entry.ids,
- entry.count,
- m,
- ksub,
- entry.dis0,
- filter,
- &mut local_heap,
- );
- } else {
- scan_codes_batched(
- &sim_table,
- &entry.codes,
- &entry.ids,
- entry.count,
- m,
- ksub,
- entry.dis0,
- filter,
- &mut local_heap,
- );
+ for results in per_list_results {
+ for (dist, id) in results {
+ heap.push(dist, id);
}
- local_heap.into_sorted()
- })
- .collect();
-
- // Merge per-list heaps
- let mut heap = TopKHeap::new(k);
- for results in per_list_results {
- for (dist, id) in results {
- heap.push(dist, id);
+ }
+ } else {
+ for (probe_idx, &list_id) in probe_indices.iter().enumerate() {
+ let count = reader.list_counts[list_id] as usize;
+ if count == 0 {
+ continue;
+ }
+ let dis0 = if use_precomputed {
+ coarse_dists[probe_idx]
+ } else {
+ 0.0
+ };
+ let (ids, codes) = reader.read_inverted_list(list_id)?;
+ let entry = PreReadList {
+ list_id,
+ count,
+ dis0,
+ ids,
+ codes,
+ };
+ let ctx = ReaderSearchContext {
+ q: &q,
+ ip_table: &ip_table,
+ use_precomputed,
+ filter,
+ d,
+ m,
+ ksub,
+ metric,
+ by_residual,
+ transposed_codes: reader.transposed_codes,
+ pq: &reader.pq,
+ quantizer_centroids: &reader.quantizer_centroids,
+ precomputed_table: &reader.precomputed_table,
+ };
+ scan_reader_list(&entry, &ctx, &mut heap);
}
}
@@ -1003,6 +1022,83 @@ pub fn search_with_reader_filter<R: SeekRead>(
Ok((result_ids, result_dists))
}
+fn scan_reader_list(entry: &PreReadList, ctx: &ReaderSearchContext<'_>, heap:
&mut TopKHeap) {
+ let d = ctx.d;
+ let m = ctx.m;
+ let ksub = ctx.ksub;
+ let metric = ctx.metric;
+ let mut sim_table = vec![0.0f32; m * ksub];
+
+ if ctx.use_precomputed {
+ let tab_base = entry.list_id * m * ksub;
+ fvec_madd(
+ &ctx.precomputed_table[tab_base..tab_base + m * ksub],
+ ctx.ip_table,
+ -2.0,
+ &mut sim_table,
+ );
+ } else if ctx.by_residual {
+ let mut residual_query = vec![0.0f32; d];
+ for j in 0..d {
+ residual_query[j] = ctx.q[j] -
ctx.quantizer_centroids[entry.list_id * d + j];
+ }
+ ctx.pq
+ .compute_distance_table(&residual_query, metric, &mut sim_table);
+ } else {
+ ctx.pq.compute_distance_table(ctx.q, metric, &mut sim_table);
+ }
+
+ let is_4bit = ctx.pq.nbits == 4;
+ if is_4bit && ctx.transposed_codes {
+ scan_codes_4bit_transposed(
+ &sim_table,
+ &entry.codes,
+ &entry.ids,
+ entry.count,
+ m,
+ entry.dis0,
+ ctx.filter,
+ heap,
+ );
+ } else if is_4bit {
+ scan_codes_4bit(
+ &sim_table,
+ &entry.codes,
+ &entry.ids,
+ entry.count,
+ m,
+ ksub,
+ entry.dis0,
+ ctx.filter,
+ heap,
+ );
+ } else if ctx.transposed_codes {
+ scan_codes_transposed(
+ &sim_table,
+ &entry.codes,
+ &entry.ids,
+ entry.count,
+ m,
+ ksub,
+ entry.dis0,
+ ctx.filter,
+ heap,
+ );
+ } else {
+ scan_codes_batched(
+ &sim_table,
+ &entry.codes,
+ &entry.ids,
+ entry.count,
+ m,
+ ksub,
+ entry.dis0,
+ ctx.filter,
+ heap,
+ );
+ }
+}
+
/// Big batch search: batch queries share list reads.
/// Instead of nq*nprobe I/O ops, reads each unique list once and scans for
all queries.
pub fn search_batch_reader<R: SeekRead>(
@@ -1269,8 +1365,52 @@ fn sift_down(heap: &mut [(f32, i64)], mut i: usize) {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::io::SeekRead;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
+ use std::io::Cursor;
+ use std::sync::{Arc, Mutex};
+
+ #[derive(Default)]
+ struct ReaderStats {
+ pread_calls: usize,
+ max_pread_len: usize,
+ }
+
+ struct NonConcurrentPreadCursor {
+ inner: Cursor<Vec<u8>>,
+ stats: Arc<Mutex<ReaderStats>>,
+ }
+
+ impl NonConcurrentPreadCursor {
+ fn new(data: Vec<u8>, stats: Arc<Mutex<ReaderStats>>) -> Self {
+ NonConcurrentPreadCursor {
+ inner: Cursor::new(data),
+ stats,
+ }
+ }
+ }
+
+ impl SeekRead for NonConcurrentPreadCursor {
+ fn seek(&mut self, pos: u64) -> io::Result<()> {
+ io::Seek::seek(&mut self.inner, io::SeekFrom::Start(pos))?;
+ Ok(())
+ }
+
+ fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
+ io::Read::read_exact(&mut self.inner, buf)
+ }
+
+ fn pread(&mut self, pos: u64, buf: &mut [u8]) -> io::Result<()> {
+ {
+ let mut stats = self.stats.lock().unwrap();
+ stats.pread_calls += 1;
+ stats.max_pread_len = stats.max_pread_len.max(buf.len());
+ }
+ io::Seek::seek(&mut self.inner, io::SeekFrom::Start(pos))?;
+ io::Read::read_exact(&mut self.inner, buf)
+ }
+ }
fn generate_clustered_data(n: usize, d: usize, num_clusters: usize, seed:
u64) -> Vec<f32> {
let mut rng = StdRng::seed_from_u64(seed);
@@ -1717,7 +1857,6 @@ mod tests {
#[test]
fn test_write_read_search() {
use crate::io::{write_index, IVFPQIndexReader, PosWriter};
- use std::io::Cursor;
let d = 16;
let nlist = 4;
@@ -1748,10 +1887,85 @@ mod tests {
}
}
+ #[test]
+ fn test_reader_search_works_without_concurrent_pread() {
+ use crate::io::{write_index, IVFPQIndexReader, PosWriter};
+
+ let d = 16;
+ let nlist = 8;
+ let m = 4;
+ let n = 800;
+ let k = 5;
+ let nprobe = 4;
+
+ let data = generate_clustered_data(n, d, 8, 789);
+ let ids: Vec<i64> = (0..n as i64).collect();
+
+ let mut index = IVFPQIndex::new(d, nlist, m, MetricType::L2, false);
+ index.train(&data, n);
+ index.add(&data, &ids, n);
+
+ let mut buf = Vec::new();
+ let mut writer = PosWriter::new(&mut buf);
+ write_index(&index, &mut writer).unwrap();
+
+ let mut baseline_reader =
IVFPQIndexReader::open(Cursor::new(buf.clone())).unwrap();
+ let (baseline_ids, baseline_dists) =
+ baseline_reader.search(&data[0..d], k, nprobe).unwrap();
+
+ let stats = Arc::new(Mutex::new(ReaderStats::default()));
+ let stream = NonConcurrentPreadCursor::new(buf, Arc::clone(&stats));
+ let mut reader = IVFPQIndexReader::open(stream).unwrap();
+ assert!(!reader.supports_concurrent_pread());
+
+ let (ids, dists) = reader.search(&data[0..d], k, nprobe).unwrap();
+
+ assert_eq!(ids, baseline_ids);
+ assert_eq!(dists, baseline_dists);
+ assert!(
+ stats.lock().unwrap().pread_calls > 0,
+ "search should still read inverted lists through pread fallback"
+ );
+ }
+
+ #[test]
+ fn test_reader_search_validates_inputs() {
+ use crate::io::{write_index, IVFPQIndexReader, PosWriter};
+
+ let d = 16;
+ let nlist = 4;
+ let m = 4;
+ let n = 500;
+
+ let data = generate_clustered_data(n, d, 4, 789);
+ let ids: Vec<i64> = (0..n as i64).collect();
+
+ let mut index = IVFPQIndex::new(d, nlist, m, MetricType::L2, false);
+ index.train(&data, n);
+ index.add(&data, &ids, n);
+
+ let mut buf = Vec::new();
+ let mut writer = PosWriter::new(&mut buf);
+ write_index(&index, &mut writer).unwrap();
+
+ let mut reader = IVFPQIndexReader::open(Cursor::new(buf)).unwrap();
+
+ let err = reader.search(&data[0..d - 1], 5, 2).unwrap_err();
+ assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
+
+ let err = reader.search(&data[0..d + 1], 5, 2).unwrap_err();
+ assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
+
+ let err = reader.search(&data[0..d], 0, 2).unwrap_err();
+ assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
+
+ let err = reader.search(&data[0..d], 5, 0).unwrap_err();
+ assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
+ }
+
#[test]
fn test_write_read_search_with_filter() {
use crate::io::{write_index, IVFPQIndexReader, PosWriter};
- use std::io::Cursor;
let d = 16;
let nlist = 4;