This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-vector-index.git


The following commit(s) were added to refs/heads/main by this push:
     new 74eddaf  Index disk shuffler partitions for direct reads (#10)
74eddaf is described below

commit 74eddafba46f45265ddcb9dd082757007527d8e2
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 8 16:38:53 2026 +0800

    Index disk shuffler partitions for direct reads (#10)
---
 core/src/shuffler.rs | 316 +++++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 258 insertions(+), 58 deletions(-)

diff --git a/core/src/shuffler.rs b/core/src/shuffler.rs
index ac96d35..2237537 100644
--- a/core/src/shuffler.rs
+++ b/core/src/shuffler.rs
@@ -19,8 +19,11 @@
 //! Inspired by Lance's shuffler: write vectors sequentially with partition 
IDs,
 //! then read back grouped by partition for PQ encoding.
 
-use std::fs::File;
-use std::io::{self, BufReader, BufWriter, Read, Write};
+#[cfg(test)]
+use std::cell::Cell;
+use std::collections::{HashMap, VecDeque};
+use std::fs::{File, OpenOptions};
+use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
 use std::path::PathBuf;
 use std::sync::atomic::{AtomicU64, Ordering};
 
@@ -28,16 +31,24 @@ type PartitionData = (Vec<Vec<i64>>, Vec<Vec<f32>>);
 
 /// Record format: [partition_id: u32][row_id: i64][vector: f32 * dim]
 const RECORD_OVERHEAD: usize = 4 + 8; // partition_id + row_id
+const MAX_OPEN_PARTITION_WRITERS: usize = 64;
 
 /// Disk-based shuffler that accumulates vectors with partition assignments,
 /// then reads them back grouped by partition.
 pub struct DiskShuffler {
     path: PathBuf,
-    writer: Option<BufWriter<File>>,
+    partition_paths: Vec<PathBuf>,
+    partition_writers: HashMap<usize, BufWriter<File>>,
+    writer_lru: VecDeque<usize>,
+    partition_opened: Vec<bool>,
     dim: usize,
     record_size: usize,
     count: usize,
     partition_counts: Vec<usize>,
+    partition_offsets: Vec<u64>,
+    #[cfg(test)]
+    bytes_read: Cell<u64>,
+    finalized: bool,
 }
 
 impl DiskShuffler {
@@ -47,16 +58,30 @@ impl DiskShuffler {
         let id = COUNTER.fetch_add(1, Ordering::Relaxed);
         let path =
             std::env::temp_dir().join(format!("ivfpq-shuffle-{}-{}.bin", 
std::process::id(), id));
-        let file = File::create(&path)?;
-        let writer = BufWriter::with_capacity(8 * 1024 * 1024, file);
+        let mut partition_paths = Vec::with_capacity(nlist);
+        for partition_id in 0..nlist {
+            partition_paths.push(std::env::temp_dir().join(format!(
+                "ivfpq-shuffle-{}-{}-part-{}.bin",
+                std::process::id(),
+                id,
+                partition_id
+            )));
+        }
 
         Ok(DiskShuffler {
             path,
-            writer: Some(writer),
+            partition_paths,
+            partition_writers: HashMap::new(),
+            writer_lru: VecDeque::new(),
+            partition_opened: vec![false; nlist],
             dim,
             record_size: RECORD_OVERHEAD + dim * 4,
             count: 0,
             partition_counts: vec![0; nlist],
+            partition_offsets: vec![0; nlist],
+            #[cfg(test)]
+            bytes_read: Cell::new(0),
+            finalized: false,
         })
     }
 
@@ -87,29 +112,58 @@ impl DiskShuffler {
                 ),
             ));
         }
-        let writer = self.writer.as_mut().unwrap();
+        if self.finalized {
+            return Err(io::Error::other("shuffler has already been 
finalized"));
+        }
+        let partition_idx = partition_id as usize;
+        let writer = self.partition_writer(partition_idx)?;
         writer.write_all(&partition_id.to_le_bytes())?;
         writer.write_all(&row_id.to_le_bytes())?;
         for &v in vector {
             writer.write_all(&v.to_le_bytes())?;
         }
-        self.partition_counts[partition_id as usize] += 1;
+        self.partition_counts[partition_idx] += 1;
         self.count += 1;
         Ok(())
     }
 
     /// Finalize writing and return partition counts.
     pub fn finish_write(&mut self) -> io::Result<()> {
-        if let Some(w) = self.writer.take() {
-            drop(w); // flush and close
+        if self.finalized {
+            return Ok(());
         }
+
+        for writer in self.partition_writers.values_mut() {
+            writer.flush()?;
+        }
+        self.partition_writers.clear();
+        self.writer_lru.clear();
+
+        let mut out = BufWriter::with_capacity(8 * 1024 * 1024, 
File::create(&self.path)?);
+        let mut offset = 0u64;
+        for partition_id in 0..self.partition_counts.len() {
+            self.partition_offsets[partition_id] = offset;
+            if self.partition_counts[partition_id] == 0 {
+                continue;
+            }
+            let mut input = BufReader::with_capacity(
+                8 * 1024 * 1024,
+                File::open(&self.partition_paths[partition_id])?,
+            );
+            let copied = io::copy(&mut input, &mut out)?;
+            offset += copied;
+        }
+        out.flush()?;
+        self.finalized = true;
         Ok(())
     }
 
     /// Read all vectors for a specific partition.
     /// Returns (row_ids, vectors) where vectors is flat [count * dim].
     pub fn read_partition(&self, partition_id: u32) -> io::Result<(Vec<i64>, 
Vec<f32>)> {
-        let count = self.partition_counts[partition_id as usize];
+        self.validate_finalized()?;
+        let partition_id = self.validate_partition_id(partition_id)?;
+        let count = self.partition_counts[partition_id];
         if count == 0 {
             return Ok((Vec::new(), Vec::new()));
         }
@@ -117,37 +171,17 @@ impl DiskShuffler {
         let mut ids = Vec::with_capacity(count);
         let mut vectors = Vec::with_capacity(count * self.dim);
 
-        let file = File::open(&self.path)?;
+        let mut file = File::open(&self.path)?;
+        file.seek(SeekFrom::Start(self.partition_offsets[partition_id]))?;
         let mut reader = BufReader::with_capacity(8 * 1024 * 1024, file);
         let mut record_buf = vec![0u8; self.record_size];
 
-        for _ in 0..self.count {
+        for _ in 0..count {
             reader.read_exact(&mut record_buf)?;
-            let pid =
-                u32::from_le_bytes([record_buf[0], record_buf[1], 
record_buf[2], record_buf[3]]);
-            if pid == partition_id {
-                let row_id = i64::from_le_bytes([
-                    record_buf[4],
-                    record_buf[5],
-                    record_buf[6],
-                    record_buf[7],
-                    record_buf[8],
-                    record_buf[9],
-                    record_buf[10],
-                    record_buf[11],
-                ]);
-                ids.push(row_id);
-                for i in 0..self.dim {
-                    let off = RECORD_OVERHEAD + i * 4;
-                    let v = f32::from_le_bytes([
-                        record_buf[off],
-                        record_buf[off + 1],
-                        record_buf[off + 2],
-                        record_buf[off + 3],
-                    ]);
-                    vectors.push(v);
-                }
-            }
+            self.add_bytes_read(self.record_size as u64);
+            let row_id = decode_row_id(&record_buf);
+            ids.push(row_id);
+            decode_vector(&record_buf, self.dim, &mut vectors);
         }
 
         Ok((ids, vectors))
@@ -156,6 +190,7 @@ impl DiskShuffler {
     /// Read all partitions at once (for moderate datasets that fit in memory 
after PQ encoding).
     /// Returns (ids_per_list, vectors_per_list).
     pub fn read_all_partitions(&self) -> io::Result<PartitionData> {
+        self.validate_finalized()?;
         let nlist = self.partition_counts.len();
         let mut all_ids: Vec<Vec<i64>> = vec![Vec::new(); nlist];
         let mut all_vectors: Vec<Vec<f32>> = vec![Vec::new(); nlist];
@@ -172,30 +207,13 @@ impl DiskShuffler {
 
         for _ in 0..self.count {
             reader.read_exact(&mut record_buf)?;
+            self.add_bytes_read(self.record_size as u64);
             let pid =
                 u32::from_le_bytes([record_buf[0], record_buf[1], 
record_buf[2], record_buf[3]])
                     as usize;
-            let row_id = i64::from_le_bytes([
-                record_buf[4],
-                record_buf[5],
-                record_buf[6],
-                record_buf[7],
-                record_buf[8],
-                record_buf[9],
-                record_buf[10],
-                record_buf[11],
-            ]);
+            let row_id = decode_row_id(&record_buf);
             all_ids[pid].push(row_id);
-            for i in 0..self.dim {
-                let off = RECORD_OVERHEAD + i * 4;
-                let v = f32::from_le_bytes([
-                    record_buf[off],
-                    record_buf[off + 1],
-                    record_buf[off + 2],
-                    record_buf[off + 3],
-                ]);
-                all_vectors[pid].push(v);
-            }
+            decode_vector(&record_buf, self.dim, &mut all_vectors[pid]);
         }
 
         Ok((all_ids, all_vectors))
@@ -208,11 +226,120 @@ impl DiskShuffler {
     pub fn partition_counts(&self) -> &[usize] {
         &self.partition_counts
     }
+
+    #[cfg(test)]
+    fn bytes_read(&self) -> u64 {
+        self.bytes_read.get()
+    }
+
+    fn validate_partition_id(&self, partition_id: u32) -> io::Result<usize> {
+        let partition_id = partition_id as usize;
+        if partition_id >= self.partition_counts.len() {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!(
+                    "partition_id {} out of range (nlist={})",
+                    partition_id,
+                    self.partition_counts.len()
+                ),
+            ));
+        }
+        Ok(partition_id)
+    }
+
+    fn validate_finalized(&self) -> io::Result<()> {
+        if !self.finalized {
+            return Err(io::Error::other(
+                "finish_write must be called before reading",
+            ));
+        }
+        Ok(())
+    }
+
+    #[cfg(test)]
+    fn add_bytes_read(&self, bytes: u64) {
+        self.bytes_read.set(self.bytes_read.get() + bytes);
+    }
+
+    #[cfg(not(test))]
+    fn add_bytes_read(&self, _bytes: u64) {}
+
+    fn partition_writer(&mut self, partition_id: usize) -> io::Result<&mut 
BufWriter<File>> {
+        if !self.partition_writers.contains_key(&partition_id) {
+            if self.partition_writers.len() == MAX_OPEN_PARTITION_WRITERS {
+                if let Some(evicted_partition_id) = 
self.writer_lru.pop_front() {
+                    if let Some(mut writer) = 
self.partition_writers.remove(&evicted_partition_id) {
+                        writer.flush()?;
+                    }
+                }
+            }
+
+            let file = if self.partition_opened[partition_id] {
+                OpenOptions::new()
+                    .create(true)
+                    .append(true)
+                    .open(&self.partition_paths[partition_id])?
+            } else {
+                self.partition_opened[partition_id] = true;
+                OpenOptions::new()
+                    .create(true)
+                    .write(true)
+                    .truncate(true)
+                    .open(&self.partition_paths[partition_id])?
+            };
+            self.partition_writers.insert(
+                partition_id,
+                BufWriter::with_capacity(8 * 1024 * 1024, file),
+            );
+        }
+
+        if let Some(pos) = self
+            .writer_lru
+            .iter()
+            .position(|&cached_partition_id| cached_partition_id == 
partition_id)
+        {
+            self.writer_lru.remove(pos);
+        }
+        self.writer_lru.push_back(partition_id);
+
+        self.partition_writers
+            .get_mut(&partition_id)
+            .ok_or_else(|| io::Error::other("failed to open partition writer"))
+    }
 }
 
 impl Drop for DiskShuffler {
     fn drop(&mut self) {
         let _ = std::fs::remove_file(&self.path);
+        for path in &self.partition_paths {
+            let _ = std::fs::remove_file(path);
+        }
+    }
+}
+
+fn decode_row_id(record_buf: &[u8]) -> i64 {
+    i64::from_le_bytes([
+        record_buf[4],
+        record_buf[5],
+        record_buf[6],
+        record_buf[7],
+        record_buf[8],
+        record_buf[9],
+        record_buf[10],
+        record_buf[11],
+    ])
+}
+
+fn decode_vector(record_buf: &[u8], dim: usize, out: &mut Vec<f32>) {
+    for i in 0..dim {
+        let off = RECORD_OVERHEAD + i * 4;
+        let v = f32::from_le_bytes([
+            record_buf[off],
+            record_buf[off + 1],
+            record_buf[off + 2],
+            record_buf[off + 3],
+        ]);
+        out.push(v);
     }
 }
 
@@ -236,6 +363,50 @@ mod tests {
         assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
     }
 
+    #[test]
+    fn test_new_does_not_open_one_file_per_partition() {
+        let shuffler = DiskShuffler::new(4, 1024).unwrap();
+
+        assert!(shuffler.partition_writers.is_empty());
+        assert_eq!(shuffler.partition_counts().len(), 1024);
+    }
+
+    #[test]
+    fn test_write_vector_limits_open_partition_writers() {
+        let nlist = MAX_OPEN_PARTITION_WRITERS + 8;
+        let mut shuffler = DiskShuffler::new(4, nlist).unwrap();
+
+        for partition_id in 0..nlist {
+            shuffler
+                .write_vector(
+                    partition_id as u32,
+                    partition_id as i64,
+                    &[1.0, 2.0, 3.0, 4.0],
+                )
+                .unwrap();
+        }
+
+        assert_eq!(shuffler.partition_writers.len(), 
MAX_OPEN_PARTITION_WRITERS);
+    }
+
+    #[test]
+    fn test_read_partition_requires_finish_write() {
+        let mut shuffler = DiskShuffler::new(4, 2).unwrap();
+        shuffler.write_vector(0, 1, &[1.0, 2.0, 3.0, 4.0]).unwrap();
+
+        let err = shuffler.read_partition(0).unwrap_err();
+        assert_eq!(err.kind(), io::ErrorKind::Other);
+    }
+
+    #[test]
+    fn test_read_partition_validates_partition_id() {
+        let mut shuffler = DiskShuffler::new(4, 2).unwrap();
+        shuffler.finish_write().unwrap();
+
+        let err = shuffler.read_partition(5).unwrap_err();
+        assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
+    }
+
     #[test]
     fn test_shuffler_roundtrip() {
         let dim = 4;
@@ -273,4 +444,33 @@ mod tests {
         assert_eq!(all_ids[2], vec![400]);
         assert_eq!(&all_vecs[1][..], &[5.0, 6.0, 7.0, 8.0]);
     }
+
+    #[test]
+    fn test_read_partition_uses_partition_index() {
+        let dim = 2;
+        let nlist = 3;
+        let mut shuffler = DiskShuffler::new(dim, nlist).unwrap();
+
+        for i in 0..10 {
+            shuffler
+                .write_vector((i % nlist) as u32, i as i64, &[i as f32, i as 
f32 + 0.5])
+                .unwrap();
+        }
+        shuffler.finish_write().unwrap();
+
+        let partition_id = 1;
+        let partition_count = shuffler.partition_counts()[partition_id] as u64;
+        let expected_bytes = partition_count * shuffler.record_size as u64;
+
+        let before = shuffler.bytes_read();
+        let (ids, vectors) = shuffler.read_partition(partition_id as 
u32).unwrap();
+        let bytes_read = shuffler.bytes_read() - before;
+
+        assert_eq!(ids, vec![1, 4, 7]);
+        assert_eq!(vectors.len(), ids.len() * dim);
+        assert_eq!(
+            bytes_read, expected_bytes,
+            "read_partition should read only the selected partition's records"
+        );
+    }
 }

Reply via email to