This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git


The following commit(s) were added to refs/heads/main by this push:
     new c838f62  Optimize projected read buffer copies (#5)
c838f62 is described below

commit c838f628dbf90d30a7f559f81739e5dac750bf95
Author: QuakeWang <[email protected]>
AuthorDate: Tue May 19 10:14:33 2026 +0800

    Optimize projected read buffer copies (#5)
---
 core/src/bucket_reader.rs |  33 ++++++-
 core/src/reader.rs        | 228 +++++++++++++++++++++++++++++++---------------
 core/src/reader_tests.rs  |  48 ++++++++++
 3 files changed, 235 insertions(+), 74 deletions(-)

diff --git a/core/src/bucket_reader.rs b/core/src/bucket_reader.rs
index 4a782ce..495cae1 100644
--- a/core/src/bucket_reader.rs
+++ b/core/src/bucket_reader.rs
@@ -565,6 +565,33 @@ impl ColumnPageReader {
         page_data: Vec<u8>,
         num_rows: usize,
     ) -> io::Result<Self> {
+        Self::new_with_page_data_start(
+            col_type,
+            encoding,
+            has_nulls,
+            const_value,
+            page_data,
+            0,
+            num_rows,
+        )
+    }
+
+    pub(crate) fn new_with_page_data_start(
+        col_type: DataType,
+        encoding: u8,
+        has_nulls: bool,
+        const_value: Value,
+        data: Vec<u8>,
+        page_data_start: usize,
+        num_rows: usize,
+    ) -> io::Result<Self> {
+        if page_data_start > data.len() {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                "column page data start out of bounds",
+            ));
+        }
+
         let mut reader = ColumnPageReader {
             col_type,
             encoding,
@@ -573,8 +600,8 @@ impl ColumnPageReader {
             dict_values: Vec::new(),
             dict_bit_width: 0,
             null_bitmap: Vec::new(),
-            data: page_data,
-            data_cursor: 0,
+            data,
+            data_cursor: page_data_start,
             num_rows,
         };
         reader.init_page()?;
@@ -583,7 +610,7 @@ impl ColumnPageReader {
 
     fn init_page(&mut self) -> io::Result<()> {
         let null_bitmap_bytes = self.num_rows.div_ceil(8);
-        let mut pos = 0;
+        let mut pos = self.data_cursor;
 
         match self.encoding {
             ENCODING_ALL_NULL => {}
diff --git a/core/src/reader.rs b/core/src/reader.rs
index 76b0697..be56a76 100644
--- a/core/src/reader.rs
+++ b/core/src/reader.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use std::io;
+use std::sync::Arc;
 
 use arrow_array::ArrayRef;
 use arrow_array::RecordBatch;
@@ -32,6 +33,38 @@ use crate::varint;
 const COALESCE_GAP: u64 = 1024 * 1024;
 const COALESCE_MAX_RANGE: u64 = 32 * 1024 * 1024;
 
+#[derive(Clone)]
+pub struct ReadRangeBuffer {
+    data: Arc<Vec<u8>>,
+    start: usize,
+    len: usize,
+}
+
+impl ReadRangeBuffer {
+    pub fn new(data: Arc<Vec<u8>>, start: usize, len: usize) -> 
io::Result<Self> {
+        if start.checked_add(len).is_none_or(|end| end > data.len()) {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                "read range buffer bounds exceed backing data",
+            ));
+        }
+
+        Ok(Self { data, start, len })
+    }
+
+    pub fn as_slice(&self) -> &[u8] {
+        &self.data[self.start..self.start + self.len]
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+}
+
 /// A random-access file abstraction for reading Mosaic files.
 ///
 /// The `Sync` bound is required because the reader may call `read_at` from
@@ -44,64 +77,30 @@ pub trait InputFile: Sync {
     /// This method must be safe to call concurrently from multiple threads.
     fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()>;
 
-    fn read_ranges(&self, ranges: &[(u64, usize)]) -> io::Result<Vec<Vec<u8>>> 
{
-        if ranges.is_empty() {
-            return Ok(Vec::new());
-        }
-
-        // Build sorted index
-        let mut indices: Vec<usize> = (0..ranges.len()).collect();
-        indices.sort_unstable_by_key(|&i| ranges[i].0);
-
-        // Merge ranges with gap <= COALESCE_GAP and total size <= 
COALESCE_MAX_RANGE
-        struct MergedRange {
-            start: u64,
-            end: u64,
-            members: Vec<usize>, // original indices
-        }
-
-        let mut merged: Vec<MergedRange> = Vec::new();
-        for &idx in &indices {
-            let (offset, len) = ranges[idx];
-            let range_end = offset + len as u64;
+    fn read_ranges_shared(&self, ranges: &[(u64, usize)]) -> 
io::Result<Vec<ReadRangeBuffer>> {
+        let (merged, fetched) = read_merged_ranges(self, ranges)?;
+        let fetched: Vec<Arc<Vec<u8>>> = 
fetched.into_iter().map(Arc::new).collect();
 
-            let should_merge = if let Some(last) = merged.last() {
-                offset >= last.start
-                    && offset.saturating_sub(last.end) <= COALESCE_GAP
-                    && (range_end - last.start) <= COALESCE_MAX_RANGE
-            } else {
-                false
-            };
-
-            if should_merge {
-                let last = merged.last_mut().unwrap();
-                last.end = last.end.max(range_end);
-                last.members.push(idx);
-            } else {
-                merged.push(MergedRange {
-                    start: offset,
-                    end: range_end,
-                    members: vec![idx],
-                });
+        // Distribute views back to original order
+        let mut results: Vec<Option<ReadRangeBuffer>> = 
Vec::with_capacity(ranges.len());
+        results.resize_with(ranges.len(), || None);
+        for (mi, mr) in merged.iter().enumerate() {
+            let data = fetched[mi].clone();
+            for &idx in &mr.members {
+                let (offset, len) = ranges[idx];
+                let rel_start = (offset - mr.start) as usize;
+                results[idx] = Some(ReadRangeBuffer::new(data.clone(), 
rel_start, len)?);
             }
         }
 
-        // Fetch merged ranges in parallel
-        let fetched: Vec<io::Result<Vec<u8>>> = std::thread::scope(|s| {
-            let handles: Vec<_> = merged
-                .iter()
-                .map(|mr| {
-                    s.spawn(|| {
-                        let len = (mr.end - mr.start) as usize;
-                        let mut buf = vec![0u8; len];
-                        self.read_at(mr.start, &mut buf)?;
-                        Ok(buf)
-                    })
-                })
-                .collect();
-            handles.into_iter().map(|h| h.join().unwrap()).collect()
-        });
-        let fetched: Vec<Vec<u8>> = 
fetched.into_iter().collect::<io::Result<_>>()?;
+        results
+            .into_iter()
+            .collect::<Option<Vec<_>>>()
+            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing 
range buffer"))
+    }
+
+    fn read_ranges(&self, ranges: &[(u64, usize)]) -> io::Result<Vec<Vec<u8>>> 
{
+        let (merged, fetched) = read_merged_ranges(self, ranges)?;
 
         // Distribute slices back to original order
         let mut results: Vec<Vec<u8>> = Vec::with_capacity(ranges.len());
@@ -119,6 +118,68 @@ pub trait InputFile: Sync {
     }
 }
 
+struct MergedRange {
+    start: u64,
+    end: u64,
+    members: Vec<usize>,
+}
+
+fn read_merged_ranges<I: InputFile + ?Sized>(
+    input: &I,
+    ranges: &[(u64, usize)],
+) -> io::Result<(Vec<MergedRange>, Vec<Vec<u8>>)> {
+    if ranges.is_empty() {
+        return Ok((Vec::new(), Vec::new()));
+    }
+
+    let mut indices: Vec<usize> = (0..ranges.len()).collect();
+    indices.sort_unstable_by_key(|&i| ranges[i].0);
+
+    let mut merged: Vec<MergedRange> = Vec::new();
+    for &idx in &indices {
+        let (offset, len) = ranges[idx];
+        let range_end = offset + len as u64;
+
+        let should_merge = if let Some(last) = merged.last() {
+            offset >= last.start
+                && offset.saturating_sub(last.end) <= COALESCE_GAP
+                && (range_end - last.start) <= COALESCE_MAX_RANGE
+        } else {
+            false
+        };
+
+        if should_merge {
+            let last = merged.last_mut().unwrap();
+            last.end = last.end.max(range_end);
+            last.members.push(idx);
+        } else {
+            merged.push(MergedRange {
+                start: offset,
+                end: range_end,
+                members: vec![idx],
+            });
+        }
+    }
+
+    let fetched: Vec<io::Result<Vec<u8>>> = std::thread::scope(|s| {
+        let handles: Vec<_> = merged
+            .iter()
+            .map(|mr| {
+                s.spawn(|| {
+                    let len = (mr.end - mr.start) as usize;
+                    let mut buf = vec![0u8; len];
+                    input.read_at(mr.start, &mut buf)?;
+                    Ok(buf)
+                })
+            })
+            .collect();
+        handles.into_iter().map(|h| h.join().unwrap()).collect()
+    });
+    let fetched = fetched.into_iter().collect::<io::Result<Vec<_>>>()?;
+
+    Ok((merged, fetched))
+}
+
 pub struct RowGroupMeta {
     pub num_rows: usize,
     pub bucket_offsets: Vec<u64>,
@@ -382,14 +443,13 @@ impl<I: InputFile> MosaicReader<I> {
             }
         }
 
-        let page_data = page_content[ppos..].to_vec();
-
-        ColumnPageReader::new(
+        ColumnPageReader::new_with_page_data_start(
             col_type.clone(),
             encoding,
             has_nulls,
             const_value,
-            page_data,
+            page_content,
+            ppos,
             num_rows,
         )
     }
@@ -523,7 +583,7 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
         }
 
         // Round 1: batch read all directories + monolithic blobs
-        let r1_buffers = self.input.read_ranges(&r1_ranges)?;
+        let r1_buffers = self.input.read_ranges_shared(&r1_ranges)?;
 
         // Process Round 1 results, build Round 2 ranges for paged bucket slots
         let mut bucket_states: Vec<Option<BucketState>> =
@@ -541,14 +601,14 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
         let mut paged_dir_info: Vec<Option<(Vec<usize>, Vec<u64>)>> = 
vec![None; self.num_buckets];
 
         for (ri, &b) in r1_bucket_ids.iter().enumerate() {
-            let buf = &r1_buffers[ri];
+            let buf = r1_buffers[ri].as_slice();
             match bucket_kinds[b] {
                 BucketLayout::Monolithic {
                     uncompressed_size, ..
                 } => {
                     let global_indices = &self.schema.bucket_to_global[b];
                     let bucket_data = match self.compression {
-                        COMPRESSION_NONE => buf.clone(),
+                        COMPRESSION_NONE => buf.to_vec(),
                         COMPRESSION_ZSTD => zstd::bulk::decompress(buf, 
uncompressed_size)
                             .map_err(|e| 
io::Error::new(io::ErrorKind::InvalidData, e))?,
                         _ => {
@@ -670,28 +730,47 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
 
         // Round 2: batch read all paged column slots
         if !r2_ranges.is_empty() {
-            let r2_buffers = self.input.read_ranges(&r2_ranges)?;
+            let r2_buffers = self.input.read_ranges_shared(&r2_ranges)?;
 
-            // Distribute slot data to per-bucket column readers
-            // First, build a per-bucket map of col_idx -> slot_data
-            let mut paged_slot_data: Vec<Vec<Option<Vec<u8>>>> =
+            struct SlotLocation {
+                group_idx: usize,
+                start: usize,
+                len: usize,
+            }
+
+            let mut slot_locations: Vec<Vec<Option<SlotLocation>>> =
                 Vec::with_capacity(self.num_buckets);
             for b in 0..self.num_buckets {
                 let n = self.schema.bucket_to_global[b].len();
-                paged_slot_data.push(vec![None; n]);
+                slot_locations.push((0..n).map(|_| None).collect());
             }
 
             for (group_idx, group) in r2_group_infos.iter().enumerate() {
-                let buf = &r2_buffers[group_idx];
+                let buf = r2_buffers[group_idx].as_slice();
                 let group_base = r2_ranges[group_idx].0;
                 for info in group {
                     let (_, ref slot_file_offsets) =
                         paged_dir_info[info.bucket_id].as_ref().unwrap();
                     let (ref slot_sizes, _) = 
paged_dir_info[info.bucket_id].as_ref().unwrap();
                     let rel_start = (slot_file_offsets[info.col_idx] - 
group_base) as usize;
-                    let rel_end = rel_start + slot_sizes[info.col_idx];
-                    paged_slot_data[info.bucket_id][info.col_idx] =
-                        Some(buf[rel_start..rel_end].to_vec());
+                    let slot_len = slot_sizes[info.col_idx];
+                    let Some(rel_end) = rel_start.checked_add(slot_len) else {
+                        return Err(io::Error::new(
+                            io::ErrorKind::InvalidData,
+                            "paged bucket: slot range overflows read buffer",
+                        ));
+                    };
+                    if rel_end > buf.len() {
+                        return Err(io::Error::new(
+                            io::ErrorKind::InvalidData,
+                            "paged bucket: slot range exceeds read buffer",
+                        ));
+                    }
+                    slot_locations[info.bucket_id][info.col_idx] = 
Some(SlotLocation {
+                        group_idx,
+                        start: rel_start,
+                        len: slot_len,
+                    });
                 }
             }
 
@@ -727,7 +806,14 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
                         continue;
                     }
 
-                    let slot_data = paged_slot_data[b][i].as_ref().unwrap();
+                    let location = slot_locations[b][i].as_ref().ok_or_else(|| 
{
+                        io::Error::new(
+                            io::ErrorKind::InvalidData,
+                            "paged bucket: missing projected slot data",
+                        )
+                    })?;
+                    let group_buffer = 
r2_buffers[location.group_idx].as_slice();
+                    let slot_data = 
&group_buffer[location.start..location.start + location.len];
                     let column_reader =
                         Self::parse_column_slot(slot_data, &col_type, 
meta.num_rows)?;
                     column_readers.push(Some(column_reader));
diff --git a/core/src/reader_tests.rs b/core/src/reader_tests.rs
index 243360b..2c21483 100644
--- a/core/src/reader_tests.rs
+++ b/core/src/reader_tests.rs
@@ -2828,6 +2828,54 @@ fn test_read_ranges_coalesces_adjacent() {
     assert_eq!(input.read_count.load(Ordering::Relaxed), 1);
 }
 
+#[test]
+fn test_read_ranges_shared_reuses_coalesced_buffer() {
+    use std::sync::atomic::{AtomicUsize, Ordering};
+
+    struct CountingInputFile {
+        data: Vec<u8>,
+        read_count: AtomicUsize,
+    }
+
+    impl InputFile for CountingInputFile {
+        fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
+            self.read_count.fetch_add(1, Ordering::Relaxed);
+            let start = offset as usize;
+            buf.copy_from_slice(&self.data[start..start + buf.len()]);
+            Ok(())
+        }
+    }
+
+    let data: Vec<u8> = (0..512).map(|i| (i % 256) as u8).collect();
+    let input = CountingInputFile {
+        data: data.clone(),
+        read_count: AtomicUsize::new(0),
+    };
+
+    let ranges = vec![(0u64, 64usize), (64, 64), (200, 32)];
+    let results = input.read_ranges_shared(&ranges).unwrap();
+
+    assert_eq!(input.read_count.load(Ordering::Relaxed), 1);
+    assert_eq!(results[0].as_slice(), &data[0..64]);
+    assert_eq!(results[1].as_slice(), &data[64..128]);
+    assert_eq!(results[2].as_slice(), &data[200..232]);
+    assert!(Arc::ptr_eq(&results[0].data, &results[1].data));
+    assert!(Arc::ptr_eq(&results[0].data, &results[2].data));
+}
+
+#[test]
+fn test_read_range_buffer_new_validates_bounds() {
+    let data = Arc::new(vec![1, 2, 3, 4]);
+
+    let buffer = ReadRangeBuffer::new(data.clone(), 1, 2).unwrap();
+    assert_eq!(buffer.as_slice(), &[2, 3]);
+
+    match ReadRangeBuffer::new(data, 3, 2) {
+        Ok(_) => panic!("expected out-of-bounds range to fail"),
+        Err(err) => assert_eq!(err.kind(), io::ErrorKind::InvalidInput),
+    }
+}
+
 #[test]
 fn test_read_ranges_splits_large_gap() {
     use std::sync::atomic::{AtomicUsize, Ordering};

Reply via email to