This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-vector-index.git
The following commit(s) were added to refs/heads/main by this push:
new 379b0e0 Reduce delta inverted list preads (#11)
379b0e0 is described below
commit 379b0e041c4110fc32934f1031ef7e7d8711b8b1
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 8 17:40:24 2026 +0800
Reduce delta inverted list preads (#11)
---
core/src/io.rs | 135 ++++++++++++++++++++++++++++++++++++++++++++++++++++++---
1 file changed, 128 insertions(+), 7 deletions(-)
diff --git a/core/src/io.rs b/core/src/io.rs
index be038db..eb53db9 100644
--- a/core/src/io.rs
+++ b/core/src/io.rs
@@ -340,6 +340,7 @@ pub fn write_index(index: &IVFPQIndex, out: &mut dyn
SeekWrite) -> io::Result<()
let mut list_offsets = vec![0i64; nlist];
let mut list_counts = vec![0i32; nlist];
+ let mut list_id_bytes_lens = vec![0i32; nlist];
let mut current_offset = data_start;
for i in 0..nlist {
@@ -349,6 +350,13 @@ pub fn write_index(index: &IVFPQIndex, out: &mut dyn
SeekWrite) -> io::Result<()
if count > 0 {
// base_id(8) + id_bytes_len(4) + id_bytes + codes
let id_bytes_len = sorted_lists[i].1.len();
+ if id_bytes_len > i32::MAX as usize {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "delta ID section exceeds i32 length limit",
+ ));
+ }
+ list_id_bytes_lens[i] = id_bytes_len as i32;
current_offset += 8 + 4 + id_bytes_len as u64 + (count *
code_size) as u64;
}
}
@@ -357,7 +365,7 @@ pub fn write_index(index: &IVFPQIndex, out: &mut dyn
SeekWrite) -> io::Result<()
for i in 0..nlist {
write_i64_le(out, list_offsets[i])?;
write_i32_le(out, list_counts[i])?;
- write_i32_le(out, 0)?;
+ write_i32_le(out, list_id_bytes_lens[i])?;
}
// Write inverted list data
@@ -479,6 +487,7 @@ pub struct IVFPQIndexReader<R: SeekRead> {
pub pq: ProductQuantizer,
pub list_offsets: Vec<i64>,
pub list_counts: Vec<i32>,
+ pub list_id_bytes_lens: Vec<i32>,
pub precomputed_table: Vec<f32>,
delta_ids: bool,
pub transposed_codes: bool,
@@ -589,6 +598,7 @@ impl<R: SeekRead> IVFPQIndexReader<R> {
},
list_offsets: Vec::new(),
list_counts: Vec::new(),
+ list_id_bytes_lens: Vec::new(),
precomputed_table: Vec::new(),
delta_ids,
transposed_codes,
@@ -652,6 +662,7 @@ impl<R: SeekRead> IVFPQIndexReader<R> {
self.list_offsets = vec![0i64; nlist];
self.list_counts = vec![0i32; nlist];
+ self.list_id_bytes_lens = vec![0i32; nlist];
for i in 0..nlist {
self.list_offsets[i] = read_i64_le(&mut self.reader)?;
let count = read_i32_le(&mut self.reader)?;
@@ -662,7 +673,14 @@ impl<R: SeekRead> IVFPQIndexReader<R> {
));
}
self.list_counts[i] = count;
- let _pad = read_i32_le(&mut self.reader)?;
+ let id_bytes_len = read_i32_le(&mut self.reader)?;
+ if id_bytes_len < 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("negative id_bytes_len {} at list {}",
id_bytes_len, i),
+ ));
+ }
+ self.list_id_bytes_lens[i] = id_bytes_len;
}
self.loaded = true;
@@ -689,19 +707,54 @@ impl<R: SeekRead> IVFPQIndexReader<R> {
let code_bytes = checked_list_bytes(count, code_size)?;
if self.delta_ids {
- // Delta-varint format: [base_id: i64][id_bytes_len:
i32][id_bytes...]
+ // Delta-varint format: [base_id: i64][id_bytes_len:
i32][id_bytes...][codes...].
+ // Newer files also copy id_bytes_len into the offset table's
reserved field,
+ // allowing a single positional read for the whole list. Older
files keep that
+ // field as 0 and fall back to the two-read path below.
+ let id_bytes_len_from_table = self.list_id_bytes_lens[list_id];
+ if id_bytes_len_from_table > 0 {
+ let id_bytes_len = id_bytes_len_from_table as usize;
+ let payload_len = 12usize
+ .checked_add(id_bytes_len)
+ .and_then(|len| len.checked_add(code_bytes))
+ .ok_or_else(|| {
+ io::Error::new(
+ io::ErrorKind::InvalidData,
+ "inverted list payload size overflow",
+ )
+ })?;
+ let mut payload = vec![0u8; payload_len];
+ self.reader.pread(offset, &mut payload)?;
+
+ let base_id =
i64::from_le_bytes(payload[0..8].try_into().unwrap());
+ let encoded_id_bytes_len =
i32::from_le_bytes(payload[8..12].try_into().unwrap());
+ if encoded_id_bytes_len != id_bytes_len_from_table {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!(
+ "offset table id_bytes_len {} does not match list
header {}",
+ id_bytes_len_from_table, encoded_id_bytes_len
+ ),
+ ));
+ }
+ let id_bytes = &payload[12..12 + id_bytes_len];
+ let ids = decode_delta_varint_ids(base_id, id_bytes, count)?;
+ let codes = payload[12 + id_bytes_len..].to_vec();
+ return Ok((ids, codes));
+ }
+
let mut header = [0u8; 12];
self.reader.pread(offset, &mut header)?;
let base_id = i64::from_le_bytes(header[0..8].try_into().unwrap());
- let id_bytes_len_raw =
i32::from_le_bytes(header[8..12].try_into().unwrap());
- if id_bytes_len_raw < 0 {
+ let id_bytes_len =
i32::from_le_bytes(header[8..12].try_into().unwrap());
+ if id_bytes_len < 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"negative id_bytes_len",
));
}
- let id_bytes_len = id_bytes_len_raw as usize;
+ let id_bytes_len = id_bytes_len as usize;
let rest_len = id_bytes_len.checked_add(code_bytes).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
@@ -838,6 +891,11 @@ mod tests {
}
}
+ fn offset_table_start(d: usize, nlist: usize, m: usize, ksub: usize) ->
usize {
+ let dsub = d / m;
+ HEADER_SIZE + (nlist * d + m * ksub * dsub) * 4
+ }
+
#[test]
fn test_varint_roundtrip() {
let mut buf = Vec::new();
@@ -943,6 +1001,10 @@ mod tests {
.iter()
.position(|&count| count > 0)
.unwrap();
+ assert!(
+ reader.list_id_bytes_lens[non_empty_list] > 0,
+ "new files should store id_bytes_len in the offset table"
+ );
let (read_ids, codes) =
reader.read_inverted_list(non_empty_list).unwrap();
assert!(!read_ids.is_empty());
@@ -957,9 +1019,68 @@ mod tests {
stats.read_exact_calls, 0,
"reading a list should use positional reads after metadata is
loaded"
);
+ assert_eq!(
+ stats.pread_calls, 1,
+ "delta-varint lists with offset-table id length should use one
pread"
+ );
+ }
+
+ #[test]
+ fn test_read_inverted_list_falls_back_for_old_delta_offset_table() {
+ let d = 8;
+ let nlist = 2;
+ let m = 2;
+
+ let mut index = IVFPQIndex::new(d, nlist, m, MetricType::L2, false);
+ let n = 300;
+ let mut rng = rand::rngs::StdRng::seed_from_u64(42);
+ let data: Vec<f32> = (0..n * d).map(|_| rng.gen::<f32>()).collect();
+ let ids: Vec<i64> = (0..n as i64).collect();
+
+ index.train(&data, n);
+ index.add(&data, &ids, n);
+
+ let mut buf = Vec::new();
+ let mut writer = PosWriter::new(&mut buf);
+ write_index(&index, &mut writer).unwrap();
+
+ let offset_table_start = offset_table_start(d, nlist, m, 256);
+ for list_id in 0..nlist {
+ let id_bytes_len_offset = offset_table_start + list_id * 16 + 12;
+ buf[id_bytes_len_offset..id_bytes_len_offset +
4].copy_from_slice(&0i32.to_le_bytes());
+ }
+
+ let stats = Arc::new(Mutex::new(ReadStats::default()));
+ let stream = CountingPreadCursor::new(buf, Arc::clone(&stats));
+ let mut reader = IVFPQIndexReader::open(stream).unwrap();
+ reader.ensure_loaded().unwrap();
+
+ assert!(
+ reader.list_id_bytes_lens.iter().all(|&len| len == 0),
+ "old files leave the offset-table reserved field unset"
+ );
+
+ {
+ let mut stats = stats.lock().unwrap();
+ stats.seek_calls = 0;
+ stats.read_exact_calls = 0;
+ stats.pread_calls = 0;
+ }
+
+ let non_empty_list = reader
+ .list_counts
+ .iter()
+ .position(|&count| count > 0)
+ .unwrap();
+ let (read_ids, codes) =
reader.read_inverted_list(non_empty_list).unwrap();
+
+ assert!(!read_ids.is_empty());
+ assert!(!codes.is_empty());
+
+ let stats = stats.lock().unwrap();
assert_eq!(
stats.pread_calls, 2,
- "delta-varint lists should read the fixed header and payload via
pread"
+ "old delta-varint files should fall back to reading header then
payload"
);
}