This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git


The following commit(s) were added to refs/heads/main by this push:
     new fa792b7  fix: preserve paged all-null projections (#7)
fa792b7 is described below

commit fa792b70e6589985d0882ec983c234c8d1cef96d
Author: QuakeWang <[email protected]>
AuthorDate: Tue May 19 15:41:27 2026 +0800

    fix: preserve paged all-null projections (#7)
---
 core/src/reader.rs       | 25 ++++++++++++++-----------
 core/src/reader_tests.rs | 46 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 60 insertions(+), 11 deletions(-)

diff --git a/core/src/reader.rs b/core/src/reader.rs
index be56a76..a759ae5 100644
--- a/core/src/reader.rs
+++ b/core/src/reader.rs
@@ -599,6 +599,7 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
 
         // Per-bucket directory parse results (slot_sizes, slot_file_offsets) 
for paged buckets
         let mut paged_dir_info: Vec<Option<(Vec<usize>, Vec<u64>)>> = 
vec![None; self.num_buckets];
+        let mut partial_paged_buckets: Vec<usize> = Vec::new();
 
         for (ri, &b) in r1_bucket_ids.iter().enumerate() {
             let buf = r1_buffers[ri].as_slice();
@@ -722,15 +723,20 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
                         }
 
                         paged_dir_info[b] = Some((slot_sizes, 
slot_file_offsets));
+                        partial_paged_buckets.push(b);
                     }
                 }
                 BucketLayout::Empty => {}
             }
         }
 
-        // Round 2: batch read all paged column slots
-        if !r2_ranges.is_empty() {
-            let r2_buffers = self.input.read_ranges_shared(&r2_ranges)?;
+        // Round 2: batch read all paged column slots.
+        if !partial_paged_buckets.is_empty() {
+            let r2_buffers = if r2_ranges.is_empty() {
+                Vec::new()
+            } else {
+                self.input.read_ranges_shared(&r2_ranges)?
+            };
 
             struct SlotLocation {
                 group_idx: usize,
@@ -749,9 +755,8 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
                 let buf = r2_buffers[group_idx].as_slice();
                 let group_base = r2_ranges[group_idx].0;
                 for info in group {
-                    let (_, ref slot_file_offsets) =
+                    let (slot_sizes, slot_file_offsets) =
                         paged_dir_info[info.bucket_id].as_ref().unwrap();
-                    let (ref slot_sizes, _) = 
paged_dir_info[info.bucket_id].as_ref().unwrap();
                     let rel_start = (slot_file_offsets[info.col_idx] - 
group_base) as usize;
                     let slot_len = slot_sizes[info.col_idx];
                     let Some(rel_end) = rel_start.checked_add(slot_len) else {
@@ -774,14 +779,12 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
                 }
             }
 
-            // Build ColumnPageReaders for each paged bucket
-            for b in 0..self.num_buckets {
-                if !matches!(bucket_kinds[b], BucketLayout::Paged { .. }) {
-                    continue;
-                }
+            // Build ColumnPageReaders for partial paged buckets. ALL_NULL 
slots do not
+            // need round 2 IO, but still need readers when projected.
+            for &b in &partial_paged_buckets {
                 let global_indices = &self.schema.bucket_to_global[b];
                 let num_columns = global_indices.len();
-                let (ref slot_sizes, _) = paged_dir_info[b].as_ref().unwrap();
+                let (slot_sizes, _) = paged_dir_info[b].as_ref().unwrap();
 
                 let mut column_readers: Vec<Option<ColumnPageReader>> =
                     Vec::with_capacity(num_columns);
diff --git a/core/src/reader_tests.rs b/core/src/reader_tests.rs
index 2c21483..b519379 100644
--- a/core/src/reader_tests.rs
+++ b/core/src/reader_tests.rs
@@ -3239,6 +3239,52 @@ fn test_paged_all_null_columns_in_projection() {
     }
 }
 
+#[test]
+fn test_paged_all_null_only_projection() {
+    let columns = vec![
+        ("real".to_string(), DataType::Int32, true),
+        ("null_col".to_string(), DataType::Int64, true),
+    ];
+    let rows: Vec<Vec<Value>> = (0..100)
+        .map(|i| vec![Value::Integer(i), Value::Null])
+        .collect();
+
+    let out = MemOutputFile::new();
+    let mut writer = MosaicWriter::new(
+        out,
+        &columns_to_arrow_schema(&columns),
+        WriterOptions {
+            compression: COMPRESSION_ZSTD,
+            page_size_threshold: 1,
+            num_buckets: 1,
+            ..Default::default()
+        },
+    )
+    .unwrap();
+    write_values(&mut writer, &columns, &rows);
+    writer.close().unwrap();
+    let data = writer.output().buf.clone();
+    let len = data.len() as u64;
+    let reader = MosaicReader::new(ByteArrayInputFile::new(data), 
len).unwrap();
+
+    let idx_null = reader
+        .schema()
+        .columns
+        .iter()
+        .position(|c| c.name == "null_col")
+        .unwrap();
+    let mut rg = reader.row_group_reader_projected(0, &[idx_null]).unwrap();
+    let batch = rg.read_columns().unwrap();
+
+    assert_eq!(batch.num_rows(), 100);
+    assert_eq!(batch.num_columns(), 1);
+
+    let null_col = batch_col_i64(&batch, "null_col");
+    for i in 0..100usize {
+        assert!(null_col.is_null(i));
+    }
+}
+
 #[test]
 fn test_paged_adjacent_columns_coalesced_read() {
     use std::sync::atomic::{AtomicUsize, Ordering};

Reply via email to