This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git
The following commit(s) were added to refs/heads/main by this push:
new 8926646 fix: honor projection for monolithic buckets (#18)
8926646 is described below
commit 89266468d561a5d1706f6ceb6de3dddf5a729c3c
Author: QuakeWang <[email protected]>
AuthorDate: Wed May 20 19:10:55 2026 +0800
fix: honor projection for monolithic buckets (#18)
---
core/src/reader.rs | 12 ++++++++++-
core/src/reader_tests.rs | 55 ++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 66 insertions(+), 1 deletion(-)
diff --git a/core/src/reader.rs b/core/src/reader.rs
index d408519..ecf5ea7 100644
--- a/core/src/reader.rs
+++ b/core/src/reader.rs
@@ -846,6 +846,7 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
self.schema.clone(),
num_cols,
meta.num_rows,
+ projected,
))
}
}
@@ -866,6 +867,7 @@ pub struct RowGroupReader {
schema: MosaicSchema,
num_rows: usize,
num_columns: usize,
+ projected_columns: Vec<bool>,
}
impl RowGroupReader {
@@ -875,6 +877,7 @@ impl RowGroupReader {
schema: MosaicSchema,
num_columns: usize,
num_rows: usize,
+ projected_columns: Vec<bool>,
) -> Self {
let active_buckets: Vec<usize> = bucket_states
.iter()
@@ -888,6 +891,7 @@ impl RowGroupReader {
schema,
num_rows,
num_columns,
+ projected_columns,
}
}
@@ -906,14 +910,20 @@ impl RowGroupReader {
match state {
BucketState::Paged { column_readers } => {
for (local_idx, &global_idx) in
global_indices.iter().enumerate() {
+ if !self.projected_columns[global_idx] {
+ continue;
+ }
if let Some(ref cr) = column_readers[local_idx] {
arrays[global_idx] = Some(cr.read_all()?);
}
}
}
- BucketState::Monolithic { reader, .. } => {
+ BucketState::Monolithic { reader } => {
let columns = reader.read_all_columns()?;
for (local_idx, &global_idx) in
global_indices.iter().enumerate() {
+ if !self.projected_columns[global_idx] {
+ continue;
+ }
if local_idx < columns.len() {
arrays[global_idx] =
Some(columns[local_idx].clone());
}
diff --git a/core/src/reader_tests.rs b/core/src/reader_tests.rs
index cdbce0c..6c68331 100644
--- a/core/src/reader_tests.rs
+++ b/core/src/reader_tests.rs
@@ -2641,6 +2641,61 @@ fn test_read_columns_with_projection() {
}
}
+#[test]
+fn test_read_columns_with_projection_monolithic_bucket() {
+ let columns = vec![
+ ("a".to_string(), DataType::Int32, true),
+ ("b".to_string(), DataType::Utf8, true),
+ ("c".to_string(), DataType::Float64, true),
+ ];
+ let out = MemOutputFile::new();
+ let mut writer = MosaicWriter::new(
+ out,
+ &columns_to_arrow_schema(&columns),
+ WriterOptions {
+ compression: COMPRESSION_NONE,
+ num_buckets: 2,
+ ..Default::default()
+ },
+ )
+ .unwrap();
+
+ let rows: Vec<Vec<Value>> = (0..12)
+ .map(|i| {
+ vec![
+ Value::Integer(i),
+ Value::String(format!("b{}", i).into_bytes()),
+ Value::Double(i as f64 * 2.5),
+ ]
+ })
+ .collect();
+ write_values(&mut writer, &columns, &rows);
+ writer.close().unwrap();
+ let data = writer.output().buf.clone();
+ let len = data.len() as u64;
+ let reader = MosaicReader::new(ByteArrayInputFile::new(data),
len).unwrap();
+
+ let col_b = reader
+ .schema()
+ .columns
+ .iter()
+ .position(|c| c.name == "b")
+ .unwrap();
+
+ let mut rg = reader.row_group_reader_projected(0, &[col_b]).unwrap();
+ let batch = rg.read_columns().unwrap();
+
+ assert_eq!(batch.num_rows(), 12);
+ assert_eq!(batch.num_columns(), 1);
+ assert!(batch.schema().index_of("a").is_err());
+ assert!(batch.schema().index_of("c").is_err());
+
+ let b = batch_col_string(&batch, "b");
+ for i in 0..12usize {
+ assert_eq!(b.value(i), format!("b{}", i));
+ }
+}
+
#[test]
fn test_read_columns_paged() {
let columns = vec![