This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git
The following commit(s) were added to refs/heads/main by this push:
new fa792b7 fix: preserve paged all-null projections (#7)
fa792b7 is described below
commit fa792b70e6589985d0882ec983c234c8d1cef96d
Author: QuakeWang <[email protected]>
AuthorDate: Tue May 19 15:41:27 2026 +0800
fix: preserve paged all-null projections (#7)
---
core/src/reader.rs | 25 ++++++++++++++-----------
core/src/reader_tests.rs | 46 ++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 60 insertions(+), 11 deletions(-)
diff --git a/core/src/reader.rs b/core/src/reader.rs
index be56a76..a759ae5 100644
--- a/core/src/reader.rs
+++ b/core/src/reader.rs
@@ -599,6 +599,7 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
// Per-bucket directory parse results (slot_sizes, slot_file_offsets)
for paged buckets
let mut paged_dir_info: Vec<Option<(Vec<usize>, Vec<u64>)>> =
vec![None; self.num_buckets];
+ let mut partial_paged_buckets: Vec<usize> = Vec::new();
for (ri, &b) in r1_bucket_ids.iter().enumerate() {
let buf = r1_buffers[ri].as_slice();
@@ -722,15 +723,20 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
}
paged_dir_info[b] = Some((slot_sizes,
slot_file_offsets));
+ partial_paged_buckets.push(b);
}
}
BucketLayout::Empty => {}
}
}
- // Round 2: batch read all paged column slots
- if !r2_ranges.is_empty() {
- let r2_buffers = self.input.read_ranges_shared(&r2_ranges)?;
+ // Round 2: batch read all paged column slots.
+ if !partial_paged_buckets.is_empty() {
+ let r2_buffers = if r2_ranges.is_empty() {
+ Vec::new()
+ } else {
+ self.input.read_ranges_shared(&r2_ranges)?
+ };
struct SlotLocation {
group_idx: usize,
@@ -749,9 +755,8 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
let buf = r2_buffers[group_idx].as_slice();
let group_base = r2_ranges[group_idx].0;
for info in group {
- let (_, ref slot_file_offsets) =
+ let (slot_sizes, slot_file_offsets) =
paged_dir_info[info.bucket_id].as_ref().unwrap();
- let (ref slot_sizes, _) =
paged_dir_info[info.bucket_id].as_ref().unwrap();
let rel_start = (slot_file_offsets[info.col_idx] -
group_base) as usize;
let slot_len = slot_sizes[info.col_idx];
let Some(rel_end) = rel_start.checked_add(slot_len) else {
@@ -774,14 +779,12 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
}
}
- // Build ColumnPageReaders for each paged bucket
- for b in 0..self.num_buckets {
- if !matches!(bucket_kinds[b], BucketLayout::Paged { .. }) {
- continue;
- }
+ // Build ColumnPageReaders for partial paged buckets. ALL_NULL
slots do not
+ // need round 2 IO, but still need readers when projected.
+ for &b in &partial_paged_buckets {
let global_indices = &self.schema.bucket_to_global[b];
let num_columns = global_indices.len();
- let (ref slot_sizes, _) = paged_dir_info[b].as_ref().unwrap();
+ let (slot_sizes, _) = paged_dir_info[b].as_ref().unwrap();
let mut column_readers: Vec<Option<ColumnPageReader>> =
Vec::with_capacity(num_columns);
diff --git a/core/src/reader_tests.rs b/core/src/reader_tests.rs
index 2c21483..b519379 100644
--- a/core/src/reader_tests.rs
+++ b/core/src/reader_tests.rs
@@ -3239,6 +3239,52 @@ fn test_paged_all_null_columns_in_projection() {
}
}
+#[test]
+fn test_paged_all_null_only_projection() {
+ let columns = vec![
+ ("real".to_string(), DataType::Int32, true),
+ ("null_col".to_string(), DataType::Int64, true),
+ ];
+ let rows: Vec<Vec<Value>> = (0..100)
+ .map(|i| vec![Value::Integer(i), Value::Null])
+ .collect();
+
+ let out = MemOutputFile::new();
+ let mut writer = MosaicWriter::new(
+ out,
+ &columns_to_arrow_schema(&columns),
+ WriterOptions {
+ compression: COMPRESSION_ZSTD,
+ page_size_threshold: 1,
+ num_buckets: 1,
+ ..Default::default()
+ },
+ )
+ .unwrap();
+ write_values(&mut writer, &columns, &rows);
+ writer.close().unwrap();
+ let data = writer.output().buf.clone();
+ let len = data.len() as u64;
+ let reader = MosaicReader::new(ByteArrayInputFile::new(data),
len).unwrap();
+
+ let idx_null = reader
+ .schema()
+ .columns
+ .iter()
+ .position(|c| c.name == "null_col")
+ .unwrap();
+ let mut rg = reader.row_group_reader_projected(0, &[idx_null]).unwrap();
+ let batch = rg.read_columns().unwrap();
+
+ assert_eq!(batch.num_rows(), 100);
+ assert_eq!(batch.num_columns(), 1);
+
+ let null_col = batch_col_i64(&batch, "null_col");
+ for i in 0..100usize {
+ assert!(null_col.is_null(i));
+ }
+}
+
#[test]
fn test_paged_adjacent_columns_coalesced_read() {
use std::sync::atomic::{AtomicUsize, Ordering};