This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git
The following commit(s) were added to refs/heads/main by this push:
new c838f62 Optimize projected read buffer copies (#5)
c838f62 is described below
commit c838f628dbf90d30a7f559f81739e5dac750bf95
Author: QuakeWang <[email protected]>
AuthorDate: Tue May 19 10:14:33 2026 +0800
Optimize projected read buffer copies (#5)
---
core/src/bucket_reader.rs | 33 ++++++-
core/src/reader.rs | 228 +++++++++++++++++++++++++++++++---------------
core/src/reader_tests.rs | 48 ++++++++++
3 files changed, 235 insertions(+), 74 deletions(-)
diff --git a/core/src/bucket_reader.rs b/core/src/bucket_reader.rs
index 4a782ce..495cae1 100644
--- a/core/src/bucket_reader.rs
+++ b/core/src/bucket_reader.rs
@@ -565,6 +565,33 @@ impl ColumnPageReader {
page_data: Vec<u8>,
num_rows: usize,
) -> io::Result<Self> {
+ Self::new_with_page_data_start(
+ col_type,
+ encoding,
+ has_nulls,
+ const_value,
+ page_data,
+ 0,
+ num_rows,
+ )
+ }
+
+ pub(crate) fn new_with_page_data_start(
+ col_type: DataType,
+ encoding: u8,
+ has_nulls: bool,
+ const_value: Value,
+ data: Vec<u8>,
+ page_data_start: usize,
+ num_rows: usize,
+ ) -> io::Result<Self> {
+ if page_data_start > data.len() {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "column page data start out of bounds",
+ ));
+ }
+
let mut reader = ColumnPageReader {
col_type,
encoding,
@@ -573,8 +600,8 @@ impl ColumnPageReader {
dict_values: Vec::new(),
dict_bit_width: 0,
null_bitmap: Vec::new(),
- data: page_data,
- data_cursor: 0,
+ data,
+ data_cursor: page_data_start,
num_rows,
};
reader.init_page()?;
@@ -583,7 +610,7 @@ impl ColumnPageReader {
fn init_page(&mut self) -> io::Result<()> {
let null_bitmap_bytes = self.num_rows.div_ceil(8);
- let mut pos = 0;
+ let mut pos = self.data_cursor;
match self.encoding {
ENCODING_ALL_NULL => {}
diff --git a/core/src/reader.rs b/core/src/reader.rs
index 76b0697..be56a76 100644
--- a/core/src/reader.rs
+++ b/core/src/reader.rs
@@ -16,6 +16,7 @@
// under the License.
use std::io;
+use std::sync::Arc;
use arrow_array::ArrayRef;
use arrow_array::RecordBatch;
@@ -32,6 +33,38 @@ use crate::varint;
const COALESCE_GAP: u64 = 1024 * 1024;
const COALESCE_MAX_RANGE: u64 = 32 * 1024 * 1024;
+#[derive(Clone)]
+pub struct ReadRangeBuffer {
+ data: Arc<Vec<u8>>,
+ start: usize,
+ len: usize,
+}
+
+impl ReadRangeBuffer {
+ pub fn new(data: Arc<Vec<u8>>, start: usize, len: usize) ->
io::Result<Self> {
+ if start.checked_add(len).is_none_or(|end| end > data.len()) {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "read range buffer bounds exceed backing data",
+ ));
+ }
+
+ Ok(Self { data, start, len })
+ }
+
+ pub fn as_slice(&self) -> &[u8] {
+ &self.data[self.start..self.start + self.len]
+ }
+
+ pub fn len(&self) -> usize {
+ self.len
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.len == 0
+ }
+}
+
/// A random-access file abstraction for reading Mosaic files.
///
/// The `Sync` bound is required because the reader may call `read_at` from
@@ -44,64 +77,30 @@ pub trait InputFile: Sync {
/// This method must be safe to call concurrently from multiple threads.
fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()>;
- fn read_ranges(&self, ranges: &[(u64, usize)]) -> io::Result<Vec<Vec<u8>>>
{
- if ranges.is_empty() {
- return Ok(Vec::new());
- }
-
- // Build sorted index
- let mut indices: Vec<usize> = (0..ranges.len()).collect();
- indices.sort_unstable_by_key(|&i| ranges[i].0);
-
- // Merge ranges with gap <= COALESCE_GAP and total size <=
COALESCE_MAX_RANGE
- struct MergedRange {
- start: u64,
- end: u64,
- members: Vec<usize>, // original indices
- }
-
- let mut merged: Vec<MergedRange> = Vec::new();
- for &idx in &indices {
- let (offset, len) = ranges[idx];
- let range_end = offset + len as u64;
+ fn read_ranges_shared(&self, ranges: &[(u64, usize)]) ->
io::Result<Vec<ReadRangeBuffer>> {
+ let (merged, fetched) = read_merged_ranges(self, ranges)?;
+ let fetched: Vec<Arc<Vec<u8>>> =
fetched.into_iter().map(Arc::new).collect();
- let should_merge = if let Some(last) = merged.last() {
- offset >= last.start
- && offset.saturating_sub(last.end) <= COALESCE_GAP
- && (range_end - last.start) <= COALESCE_MAX_RANGE
- } else {
- false
- };
-
- if should_merge {
- let last = merged.last_mut().unwrap();
- last.end = last.end.max(range_end);
- last.members.push(idx);
- } else {
- merged.push(MergedRange {
- start: offset,
- end: range_end,
- members: vec![idx],
- });
+ // Distribute views back to original order
+ let mut results: Vec<Option<ReadRangeBuffer>> =
Vec::with_capacity(ranges.len());
+ results.resize_with(ranges.len(), || None);
+ for (mi, mr) in merged.iter().enumerate() {
+ let data = fetched[mi].clone();
+ for &idx in &mr.members {
+ let (offset, len) = ranges[idx];
+ let rel_start = (offset - mr.start) as usize;
+ results[idx] = Some(ReadRangeBuffer::new(data.clone(),
rel_start, len)?);
}
}
- // Fetch merged ranges in parallel
- let fetched: Vec<io::Result<Vec<u8>>> = std::thread::scope(|s| {
- let handles: Vec<_> = merged
- .iter()
- .map(|mr| {
- s.spawn(|| {
- let len = (mr.end - mr.start) as usize;
- let mut buf = vec![0u8; len];
- self.read_at(mr.start, &mut buf)?;
- Ok(buf)
- })
- })
- .collect();
- handles.into_iter().map(|h| h.join().unwrap()).collect()
- });
- let fetched: Vec<Vec<u8>> =
fetched.into_iter().collect::<io::Result<_>>()?;
+ results
+ .into_iter()
+ .collect::<Option<Vec<_>>>()
+ .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing
range buffer"))
+ }
+
+ fn read_ranges(&self, ranges: &[(u64, usize)]) -> io::Result<Vec<Vec<u8>>>
{
+ let (merged, fetched) = read_merged_ranges(self, ranges)?;
// Distribute slices back to original order
let mut results: Vec<Vec<u8>> = Vec::with_capacity(ranges.len());
@@ -119,6 +118,68 @@ pub trait InputFile: Sync {
}
}
+struct MergedRange {
+ start: u64,
+ end: u64,
+ members: Vec<usize>,
+}
+
+fn read_merged_ranges<I: InputFile + ?Sized>(
+ input: &I,
+ ranges: &[(u64, usize)],
+) -> io::Result<(Vec<MergedRange>, Vec<Vec<u8>>)> {
+ if ranges.is_empty() {
+ return Ok((Vec::new(), Vec::new()));
+ }
+
+ let mut indices: Vec<usize> = (0..ranges.len()).collect();
+ indices.sort_unstable_by_key(|&i| ranges[i].0);
+
+ let mut merged: Vec<MergedRange> = Vec::new();
+ for &idx in &indices {
+ let (offset, len) = ranges[idx];
+ let range_end = offset + len as u64;
+
+ let should_merge = if let Some(last) = merged.last() {
+ offset >= last.start
+ && offset.saturating_sub(last.end) <= COALESCE_GAP
+ && (range_end - last.start) <= COALESCE_MAX_RANGE
+ } else {
+ false
+ };
+
+ if should_merge {
+ let last = merged.last_mut().unwrap();
+ last.end = last.end.max(range_end);
+ last.members.push(idx);
+ } else {
+ merged.push(MergedRange {
+ start: offset,
+ end: range_end,
+ members: vec![idx],
+ });
+ }
+ }
+
+ let fetched: Vec<io::Result<Vec<u8>>> = std::thread::scope(|s| {
+ let handles: Vec<_> = merged
+ .iter()
+ .map(|mr| {
+ s.spawn(|| {
+ let len = (mr.end - mr.start) as usize;
+ let mut buf = vec![0u8; len];
+ input.read_at(mr.start, &mut buf)?;
+ Ok(buf)
+ })
+ })
+ .collect();
+ handles.into_iter().map(|h| h.join().unwrap()).collect()
+ });
+ let fetched = fetched.into_iter().collect::<io::Result<Vec<_>>>()?;
+
+ Ok((merged, fetched))
+}
+
pub struct RowGroupMeta {
pub num_rows: usize,
pub bucket_offsets: Vec<u64>,
@@ -382,14 +443,13 @@ impl<I: InputFile> MosaicReader<I> {
}
}
- let page_data = page_content[ppos..].to_vec();
-
- ColumnPageReader::new(
+ ColumnPageReader::new_with_page_data_start(
col_type.clone(),
encoding,
has_nulls,
const_value,
- page_data,
+ page_content,
+ ppos,
num_rows,
)
}
@@ -523,7 +583,7 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
}
// Round 1: batch read all directories + monolithic blobs
- let r1_buffers = self.input.read_ranges(&r1_ranges)?;
+ let r1_buffers = self.input.read_ranges_shared(&r1_ranges)?;
// Process Round 1 results, build Round 2 ranges for paged bucket slots
let mut bucket_states: Vec<Option<BucketState>> =
@@ -541,14 +601,14 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
let mut paged_dir_info: Vec<Option<(Vec<usize>, Vec<u64>)>> =
vec![None; self.num_buckets];
for (ri, &b) in r1_bucket_ids.iter().enumerate() {
- let buf = &r1_buffers[ri];
+ let buf = r1_buffers[ri].as_slice();
match bucket_kinds[b] {
BucketLayout::Monolithic {
uncompressed_size, ..
} => {
let global_indices = &self.schema.bucket_to_global[b];
let bucket_data = match self.compression {
- COMPRESSION_NONE => buf.clone(),
+ COMPRESSION_NONE => buf.to_vec(),
COMPRESSION_ZSTD => zstd::bulk::decompress(buf,
uncompressed_size)
.map_err(|e|
io::Error::new(io::ErrorKind::InvalidData, e))?,
_ => {
@@ -670,28 +730,47 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
// Round 2: batch read all paged column slots
if !r2_ranges.is_empty() {
- let r2_buffers = self.input.read_ranges(&r2_ranges)?;
+ let r2_buffers = self.input.read_ranges_shared(&r2_ranges)?;
- // Distribute slot data to per-bucket column readers
- // First, build a per-bucket map of col_idx -> slot_data
- let mut paged_slot_data: Vec<Vec<Option<Vec<u8>>>> =
+ struct SlotLocation {
+ group_idx: usize,
+ start: usize,
+ len: usize,
+ }
+
+ let mut slot_locations: Vec<Vec<Option<SlotLocation>>> =
Vec::with_capacity(self.num_buckets);
for b in 0..self.num_buckets {
let n = self.schema.bucket_to_global[b].len();
- paged_slot_data.push(vec![None; n]);
+ slot_locations.push((0..n).map(|_| None).collect());
}
for (group_idx, group) in r2_group_infos.iter().enumerate() {
- let buf = &r2_buffers[group_idx];
+ let buf = r2_buffers[group_idx].as_slice();
let group_base = r2_ranges[group_idx].0;
for info in group {
let (_, ref slot_file_offsets) =
paged_dir_info[info.bucket_id].as_ref().unwrap();
let (ref slot_sizes, _) =
paged_dir_info[info.bucket_id].as_ref().unwrap();
let rel_start = (slot_file_offsets[info.col_idx] -
group_base) as usize;
- let rel_end = rel_start + slot_sizes[info.col_idx];
- paged_slot_data[info.bucket_id][info.col_idx] =
- Some(buf[rel_start..rel_end].to_vec());
+ let slot_len = slot_sizes[info.col_idx];
+ let Some(rel_end) = rel_start.checked_add(slot_len) else {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "paged bucket: slot range overflows read buffer",
+ ));
+ };
+ if rel_end > buf.len() {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "paged bucket: slot range exceeds read buffer",
+ ));
+ }
+ slot_locations[info.bucket_id][info.col_idx] =
Some(SlotLocation {
+ group_idx,
+ start: rel_start,
+ len: slot_len,
+ });
}
}
@@ -727,7 +806,14 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
continue;
}
- let slot_data = paged_slot_data[b][i].as_ref().unwrap();
+ let location = slot_locations[b][i].as_ref().ok_or_else(||
{
+ io::Error::new(
+ io::ErrorKind::InvalidData,
+ "paged bucket: missing projected slot data",
+ )
+ })?;
+ let group_buffer =
r2_buffers[location.group_idx].as_slice();
+ let slot_data =
&group_buffer[location.start..location.start + location.len];
let column_reader =
Self::parse_column_slot(slot_data, &col_type,
meta.num_rows)?;
column_readers.push(Some(column_reader));
diff --git a/core/src/reader_tests.rs b/core/src/reader_tests.rs
index 243360b..2c21483 100644
--- a/core/src/reader_tests.rs
+++ b/core/src/reader_tests.rs
@@ -2828,6 +2828,54 @@ fn test_read_ranges_coalesces_adjacent() {
assert_eq!(input.read_count.load(Ordering::Relaxed), 1);
}
+#[test]
+fn test_read_ranges_shared_reuses_coalesced_buffer() {
+ use std::sync::atomic::{AtomicUsize, Ordering};
+
+ struct CountingInputFile {
+ data: Vec<u8>,
+ read_count: AtomicUsize,
+ }
+
+ impl InputFile for CountingInputFile {
+ fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
+ self.read_count.fetch_add(1, Ordering::Relaxed);
+ let start = offset as usize;
+ buf.copy_from_slice(&self.data[start..start + buf.len()]);
+ Ok(())
+ }
+ }
+
+ let data: Vec<u8> = (0..512).map(|i| (i % 256) as u8).collect();
+ let input = CountingInputFile {
+ data: data.clone(),
+ read_count: AtomicUsize::new(0),
+ };
+
+ let ranges = vec![(0u64, 64usize), (64, 64), (200, 32)];
+ let results = input.read_ranges_shared(&ranges).unwrap();
+
+ assert_eq!(input.read_count.load(Ordering::Relaxed), 1);
+ assert_eq!(results[0].as_slice(), &data[0..64]);
+ assert_eq!(results[1].as_slice(), &data[64..128]);
+ assert_eq!(results[2].as_slice(), &data[200..232]);
+ assert!(Arc::ptr_eq(&results[0].data, &results[1].data));
+ assert!(Arc::ptr_eq(&results[0].data, &results[2].data));
+}
+
+#[test]
+fn test_read_range_buffer_new_validates_bounds() {
+ let data = Arc::new(vec![1, 2, 3, 4]);
+
+ let buffer = ReadRangeBuffer::new(data.clone(), 1, 2).unwrap();
+ assert_eq!(buffer.as_slice(), &[2, 3]);
+
+ match ReadRangeBuffer::new(data, 3, 2) {
+ Ok(_) => panic!("expected out-of-bounds range to fail"),
+ Err(err) => assert_eq!(err.kind(), io::ErrorKind::InvalidInput),
+ }
+}
+
#[test]
fn test_read_ranges_splits_large_gap() {
use std::sync::atomic::{AtomicUsize, Ordering};