This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git


The following commit(s) were added to refs/heads/main by this push:
     new 8926646  fix: honor projection for monolithic buckets (#18)
8926646 is described below

commit 89266468d561a5d1706f6ceb6de3dddf5a729c3c
Author: QuakeWang <[email protected]>
AuthorDate: Wed May 20 19:10:55 2026 +0800

    fix: honor projection for monolithic buckets (#18)
---
 core/src/reader.rs       | 12 ++++++++++-
 core/src/reader_tests.rs | 55 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 66 insertions(+), 1 deletion(-)

diff --git a/core/src/reader.rs b/core/src/reader.rs
index d408519..ecf5ea7 100644
--- a/core/src/reader.rs
+++ b/core/src/reader.rs
@@ -846,6 +846,7 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
             self.schema.clone(),
             num_cols,
             meta.num_rows,
+            projected,
         ))
     }
 }
@@ -866,6 +867,7 @@ pub struct RowGroupReader {
     schema: MosaicSchema,
     num_rows: usize,
     num_columns: usize,
+    projected_columns: Vec<bool>,
 }
 
 impl RowGroupReader {
@@ -875,6 +877,7 @@ impl RowGroupReader {
         schema: MosaicSchema,
         num_columns: usize,
         num_rows: usize,
+        projected_columns: Vec<bool>,
     ) -> Self {
         let active_buckets: Vec<usize> = bucket_states
             .iter()
@@ -888,6 +891,7 @@ impl RowGroupReader {
             schema,
             num_rows,
             num_columns,
+            projected_columns,
         }
     }
 
@@ -906,14 +910,20 @@ impl RowGroupReader {
             match state {
                 BucketState::Paged { column_readers } => {
                     for (local_idx, &global_idx) in 
global_indices.iter().enumerate() {
+                        if !self.projected_columns[global_idx] {
+                            continue;
+                        }
                         if let Some(ref cr) = column_readers[local_idx] {
                             arrays[global_idx] = Some(cr.read_all()?);
                         }
                     }
                 }
-                BucketState::Monolithic { reader, .. } => {
+                BucketState::Monolithic { reader } => {
                     let columns = reader.read_all_columns()?;
                     for (local_idx, &global_idx) in 
global_indices.iter().enumerate() {
+                        if !self.projected_columns[global_idx] {
+                            continue;
+                        }
                         if local_idx < columns.len() {
                             arrays[global_idx] = 
Some(columns[local_idx].clone());
                         }
diff --git a/core/src/reader_tests.rs b/core/src/reader_tests.rs
index cdbce0c..6c68331 100644
--- a/core/src/reader_tests.rs
+++ b/core/src/reader_tests.rs
@@ -2641,6 +2641,61 @@ fn test_read_columns_with_projection() {
     }
 }
 
+#[test]
+fn test_read_columns_with_projection_monolithic_bucket() {
+    let columns = vec![
+        ("a".to_string(), DataType::Int32, true),
+        ("b".to_string(), DataType::Utf8, true),
+        ("c".to_string(), DataType::Float64, true),
+    ];
+    let out = MemOutputFile::new();
+    let mut writer = MosaicWriter::new(
+        out,
+        &columns_to_arrow_schema(&columns),
+        WriterOptions {
+            compression: COMPRESSION_NONE,
+            num_buckets: 2,
+            ..Default::default()
+        },
+    )
+    .unwrap();
+
+    let rows: Vec<Vec<Value>> = (0..12)
+        .map(|i| {
+            vec![
+                Value::Integer(i),
+                Value::String(format!("b{}", i).into_bytes()),
+                Value::Double(i as f64 * 2.5),
+            ]
+        })
+        .collect();
+    write_values(&mut writer, &columns, &rows);
+    writer.close().unwrap();
+    let data = writer.output().buf.clone();
+    let len = data.len() as u64;
+    let reader = MosaicReader::new(ByteArrayInputFile::new(data), 
len).unwrap();
+
+    let col_b = reader
+        .schema()
+        .columns
+        .iter()
+        .position(|c| c.name == "b")
+        .unwrap();
+
+    let mut rg = reader.row_group_reader_projected(0, &[col_b]).unwrap();
+    let batch = rg.read_columns().unwrap();
+
+    assert_eq!(batch.num_rows(), 12);
+    assert_eq!(batch.num_columns(), 1);
+    assert!(batch.schema().index_of("a").is_err());
+    assert!(batch.schema().index_of("c").is_err());
+
+    let b = batch_col_string(&batch, "b");
+    for i in 0..12usize {
+        assert_eq!(b.value(i), format!("b{}", i));
+    }
+}
+
 #[test]
 fn test_read_columns_paged() {
     let columns = vec![

Reply via email to