This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-vector-index.git


The following commit(s) were added to refs/heads/main by this push:
     new 379b0e0  Reduce delta inverted list preads (#11)
379b0e0 is described below

commit 379b0e041c4110fc32934f1031ef7e7d8711b8b1
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 8 17:40:24 2026 +0800

    Reduce delta inverted list preads (#11)
---
 core/src/io.rs | 135 ++++++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 128 insertions(+), 7 deletions(-)

diff --git a/core/src/io.rs b/core/src/io.rs
index be038db..eb53db9 100644
--- a/core/src/io.rs
+++ b/core/src/io.rs
@@ -340,6 +340,7 @@ pub fn write_index(index: &IVFPQIndex, out: &mut dyn 
SeekWrite) -> io::Result<()
 
     let mut list_offsets = vec![0i64; nlist];
     let mut list_counts = vec![0i32; nlist];
+    let mut list_id_bytes_lens = vec![0i32; nlist];
     let mut current_offset = data_start;
 
     for i in 0..nlist {
@@ -349,6 +350,13 @@ pub fn write_index(index: &IVFPQIndex, out: &mut dyn 
SeekWrite) -> io::Result<()
         if count > 0 {
             // base_id(8) + id_bytes_len(4) + id_bytes + codes
             let id_bytes_len = sorted_lists[i].1.len();
+            if id_bytes_len > i32::MAX as usize {
+                return Err(io::Error::new(
+                    io::ErrorKind::InvalidInput,
+                    "delta ID section exceeds i32 length limit",
+                ));
+            }
+            list_id_bytes_lens[i] = id_bytes_len as i32;
             current_offset += 8 + 4 + id_bytes_len as u64 + (count * 
code_size) as u64;
         }
     }
@@ -357,7 +365,7 @@ pub fn write_index(index: &IVFPQIndex, out: &mut dyn 
SeekWrite) -> io::Result<()
     for i in 0..nlist {
         write_i64_le(out, list_offsets[i])?;
         write_i32_le(out, list_counts[i])?;
-        write_i32_le(out, 0)?;
+        write_i32_le(out, list_id_bytes_lens[i])?;
     }
 
     // Write inverted list data
@@ -479,6 +487,7 @@ pub struct IVFPQIndexReader<R: SeekRead> {
     pub pq: ProductQuantizer,
     pub list_offsets: Vec<i64>,
     pub list_counts: Vec<i32>,
+    pub list_id_bytes_lens: Vec<i32>,
     pub precomputed_table: Vec<f32>,
     delta_ids: bool,
     pub transposed_codes: bool,
@@ -589,6 +598,7 @@ impl<R: SeekRead> IVFPQIndexReader<R> {
             },
             list_offsets: Vec::new(),
             list_counts: Vec::new(),
+            list_id_bytes_lens: Vec::new(),
             precomputed_table: Vec::new(),
             delta_ids,
             transposed_codes,
@@ -652,6 +662,7 @@ impl<R: SeekRead> IVFPQIndexReader<R> {
 
         self.list_offsets = vec![0i64; nlist];
         self.list_counts = vec![0i32; nlist];
+        self.list_id_bytes_lens = vec![0i32; nlist];
         for i in 0..nlist {
             self.list_offsets[i] = read_i64_le(&mut self.reader)?;
             let count = read_i32_le(&mut self.reader)?;
@@ -662,7 +673,14 @@ impl<R: SeekRead> IVFPQIndexReader<R> {
                 ));
             }
             self.list_counts[i] = count;
-            let _pad = read_i32_le(&mut self.reader)?;
+            let id_bytes_len = read_i32_le(&mut self.reader)?;
+            if id_bytes_len < 0 {
+                return Err(io::Error::new(
+                    io::ErrorKind::InvalidData,
+                    format!("negative id_bytes_len {} at list {}", 
id_bytes_len, i),
+                ));
+            }
+            self.list_id_bytes_lens[i] = id_bytes_len;
         }
 
         self.loaded = true;
@@ -689,19 +707,54 @@ impl<R: SeekRead> IVFPQIndexReader<R> {
         let code_bytes = checked_list_bytes(count, code_size)?;
 
         if self.delta_ids {
-            // Delta-varint format: [base_id: i64][id_bytes_len: 
i32][id_bytes...]
+            // Delta-varint format: [base_id: i64][id_bytes_len: 
i32][id_bytes...][codes...].
+            // Newer files also copy id_bytes_len into the offset table's 
reserved field,
+            // allowing a single positional read for the whole list. Older 
files keep that
+            // field as 0 and fall back to the two-read path below.
+            let id_bytes_len_from_table = self.list_id_bytes_lens[list_id];
+            if id_bytes_len_from_table > 0 {
+                let id_bytes_len = id_bytes_len_from_table as usize;
+                let payload_len = 12usize
+                    .checked_add(id_bytes_len)
+                    .and_then(|len| len.checked_add(code_bytes))
+                    .ok_or_else(|| {
+                        io::Error::new(
+                            io::ErrorKind::InvalidData,
+                            "inverted list payload size overflow",
+                        )
+                    })?;
+                let mut payload = vec![0u8; payload_len];
+                self.reader.pread(offset, &mut payload)?;
+
+                let base_id = 
i64::from_le_bytes(payload[0..8].try_into().unwrap());
+                let encoded_id_bytes_len = 
i32::from_le_bytes(payload[8..12].try_into().unwrap());
+                if encoded_id_bytes_len != id_bytes_len_from_table {
+                    return Err(io::Error::new(
+                        io::ErrorKind::InvalidData,
+                        format!(
+                            "offset table id_bytes_len {} does not match list 
header {}",
+                            id_bytes_len_from_table, encoded_id_bytes_len
+                        ),
+                    ));
+                }
+                let id_bytes = &payload[12..12 + id_bytes_len];
+                let ids = decode_delta_varint_ids(base_id, id_bytes, count)?;
+                let codes = payload[12 + id_bytes_len..].to_vec();
+                return Ok((ids, codes));
+            }
+
             let mut header = [0u8; 12];
             self.reader.pread(offset, &mut header)?;
 
             let base_id = i64::from_le_bytes(header[0..8].try_into().unwrap());
-            let id_bytes_len_raw = 
i32::from_le_bytes(header[8..12].try_into().unwrap());
-            if id_bytes_len_raw < 0 {
+            let id_bytes_len = 
i32::from_le_bytes(header[8..12].try_into().unwrap());
+            if id_bytes_len < 0 {
                 return Err(io::Error::new(
                     io::ErrorKind::InvalidData,
                     "negative id_bytes_len",
                 ));
             }
-            let id_bytes_len = id_bytes_len_raw as usize;
+            let id_bytes_len = id_bytes_len as usize;
             let rest_len = id_bytes_len.checked_add(code_bytes).ok_or_else(|| {
                 io::Error::new(
                     io::ErrorKind::InvalidData,
@@ -838,6 +891,11 @@ mod tests {
         }
     }
 
+    fn offset_table_start(d: usize, nlist: usize, m: usize, ksub: usize) -> 
usize {
+        let dsub = d / m;
+        HEADER_SIZE + (nlist * d + m * ksub * dsub) * 4
+    }
+
     #[test]
     fn test_varint_roundtrip() {
         let mut buf = Vec::new();
@@ -943,6 +1001,10 @@ mod tests {
             .iter()
             .position(|&count| count > 0)
             .unwrap();
+        assert!(
+            reader.list_id_bytes_lens[non_empty_list] > 0,
+            "new files should store id_bytes_len in the offset table"
+        );
         let (read_ids, codes) = 
reader.read_inverted_list(non_empty_list).unwrap();
 
         assert!(!read_ids.is_empty());
@@ -957,9 +1019,68 @@ mod tests {
             stats.read_exact_calls, 0,
             "reading a list should use positional reads after metadata is 
loaded"
         );
+        assert_eq!(
+            stats.pread_calls, 1,
+            "delta-varint lists with offset-table id length should use one 
pread"
+        );
+    }
+
+    #[test]
+    fn test_read_inverted_list_falls_back_for_old_delta_offset_table() {
+        let d = 8;
+        let nlist = 2;
+        let m = 2;
+
+        let mut index = IVFPQIndex::new(d, nlist, m, MetricType::L2, false);
+        let n = 300;
+        let mut rng = rand::rngs::StdRng::seed_from_u64(42);
+        let data: Vec<f32> = (0..n * d).map(|_| rng.gen::<f32>()).collect();
+        let ids: Vec<i64> = (0..n as i64).collect();
+
+        index.train(&data, n);
+        index.add(&data, &ids, n);
+
+        let mut buf = Vec::new();
+        let mut writer = PosWriter::new(&mut buf);
+        write_index(&index, &mut writer).unwrap();
+
+        let offset_table_start = offset_table_start(d, nlist, m, 256);
+        for list_id in 0..nlist {
+            let id_bytes_len_offset = offset_table_start + list_id * 16 + 12;
+            buf[id_bytes_len_offset..id_bytes_len_offset + 
4].copy_from_slice(&0i32.to_le_bytes());
+        }
+
+        let stats = Arc::new(Mutex::new(ReadStats::default()));
+        let stream = CountingPreadCursor::new(buf, Arc::clone(&stats));
+        let mut reader = IVFPQIndexReader::open(stream).unwrap();
+        reader.ensure_loaded().unwrap();
+
+        assert!(
+            reader.list_id_bytes_lens.iter().all(|&len| len == 0),
+            "old files leave the offset-table reserved field unset"
+        );
+
+        {
+            let mut stats = stats.lock().unwrap();
+            stats.seek_calls = 0;
+            stats.read_exact_calls = 0;
+            stats.pread_calls = 0;
+        }
+
+        let non_empty_list = reader
+            .list_counts
+            .iter()
+            .position(|&count| count > 0)
+            .unwrap();
+        let (read_ids, codes) = 
reader.read_inverted_list(non_empty_list).unwrap();
+
+        assert!(!read_ids.is_empty());
+        assert!(!codes.is_empty());
+
+        let stats = stats.lock().unwrap();
         assert_eq!(
             stats.pread_calls, 2,
-            "delta-varint lists should read the fixed header and payload via 
pread"
+            "old delta-varint files should fall back to reading header then 
payload"
         );
     }
 

Reply via email to