QuakeWang commented on code in PR #197:
URL: https://github.com/apache/paimon-rust/pull/197#discussion_r3032091815


##########
crates/paimon/src/arrow/mod.rs:
##########
@@ -16,5 +16,113 @@
 // under the License.
 
 mod reader;
+pub(crate) mod schema_evolution;
 
 pub use crate::arrow::reader::ArrowReaderBuilder;
+
+use crate::spec::DataType as PaimonDataType;
+use arrow_schema::DataType as ArrowDataType;
+use arrow_schema::{Field as ArrowField, TimeUnit};
+use std::sync::Arc;
+
+/// Converts a Paimon [`DataType`](PaimonDataType) to an Arrow 
[`DataType`](ArrowDataType).
+pub fn paimon_type_to_arrow(dt: &PaimonDataType) -> 
crate::Result<ArrowDataType> {
+    Ok(match dt {
+        PaimonDataType::Boolean(_) => ArrowDataType::Boolean,
+        PaimonDataType::TinyInt(_) => ArrowDataType::Int8,
+        PaimonDataType::SmallInt(_) => ArrowDataType::Int16,
+        PaimonDataType::Int(_) => ArrowDataType::Int32,
+        PaimonDataType::BigInt(_) => ArrowDataType::Int64,
+        PaimonDataType::Float(_) => ArrowDataType::Float32,
+        PaimonDataType::Double(_) => ArrowDataType::Float64,
+        PaimonDataType::VarChar(_) | PaimonDataType::Char(_) => 
ArrowDataType::Utf8,
+        PaimonDataType::Binary(_) | PaimonDataType::VarBinary(_) => 
ArrowDataType::Binary,
+        PaimonDataType::Date(_) => ArrowDataType::Date32,
+        PaimonDataType::Time(_) => 
ArrowDataType::Time32(TimeUnit::Millisecond),
+        PaimonDataType::Timestamp(t) => {

Review Comment:
   I can see the idea behind changing `TIMESTAMP(0)` to `Timestamp(Second)`, 
but right now this only changes the reader side. The DataFusion provider-side 
schema mapping is still on the old behavior.
   
   At execution time we consume the batches from `read.to_arrow()` directly, 
while the provider schema still reports `TIMESTAMP(0)` as 
`Timestamp(Millisecond)`. So as soon as a table contains `TIMESTAMP(0)`, the 
declared schema and the actual batch schema no longer match.
   
   It would be better to update both sides together here,  this leaves a pretty 
subtle trap in the DataFusion integration.



##########
crates/paimon/src/arrow/reader.rs:
##########
@@ -242,108 +326,213 @@ fn read_single_file_stream(
         let mut batch_stream = batch_stream_builder.build()?;
         while let Some(batch) = batch_stream.next().await {
             let batch = batch?;
-            // Reorder columns from parquet-schema order to 
projected_column_names order,
-            // consistent with the normal read() path.
-            let reorder_indices: Vec<usize> = projected_column_names
-                .iter()
-                .filter_map(|name| batch.schema().index_of(name).ok())
-                .collect();
-            if reorder_indices.len() == batch.num_columns() {
-                yield batch.project(&reorder_indices).map_err(|e| {
-                    Error::UnexpectedError {
-                        message: "Failed to reorder projected 
columns".to_string(),
-                        source: Some(Box::new(e)),
+            let num_rows = batch.num_rows();
+            let batch_schema = batch.schema();
+
+            // Build output columns using index mapping (field-ID-based) or by 
name.
+            let mut columns: Vec<Arc<dyn arrow_array::Array>> = 
Vec::with_capacity(target_schema.fields().len());
+            for (i, target_field) in target_schema.fields().iter().enumerate() 
{
+                let source_col = if let Some(ref idx_map) = index_mapping {
+                    let data_idx = idx_map[i];
+                    if data_idx == NULL_FIELD_INDEX {
+                        None
+                    } else {
+                        // Find the column in the batch by the data field's 
name.
+                        let data_field = 
&data_fields.as_ref().unwrap()[data_idx as usize];
+                        batch_schema
+                            .index_of(data_field.name())
+                            .ok()
+                            .map(|col_idx| batch.column(col_idx))
+                    }
+                } else if let Some(ref df) = data_fields {
+                    // Identity mapping with data_fields present (e.g. renamed 
column).
+                    // Use data field name (old name in parquet) at the same 
position.
+                    batch_schema
+                        .index_of(df[i].name())
+                        .ok()
+                        .map(|col_idx| batch.column(col_idx))
+                } else {
+                    // No schema evolution — look up by target field name.
+                    batch_schema
+                        .index_of(target_field.name())
+                        .ok()
+                        .map(|col_idx| batch.column(col_idx))
+                };
+
+                match source_col {
+                    Some(col) => {
+                        if col.data_type() == target_field.data_type() {
+                            columns.push(col.clone());
+                        } else {
+                            // Type promotion: cast to target type.
+                            let casted = cast(col, 
target_field.data_type()).map_err(|e| {
+                                Error::UnexpectedError {
+                                    message: format!(
+                                        "Failed to cast column '{}' from {:?} 
to {:?}: {e}",
+                                        target_field.name(),
+                                        col.data_type(),
+                                        target_field.data_type()
+                                    ),
+                                    source: Some(Box::new(e)),
+                                }
+                            })?;
+                            columns.push(casted);
+                        }
+                    }
+                    None => {
+                        // Column missing from file: fill with nulls.
+                        let null_array = 
arrow_array::new_null_array(target_field.data_type(), num_rows);
+                        columns.push(null_array);
                     }
-                })?;
+                }
+            }
+
+            let result = if columns.is_empty() {
+                RecordBatch::try_new_with_options(
+                    target_schema.clone(),
+                    columns,
+                    
&arrow_array::RecordBatchOptions::new().with_row_count(Some(num_rows)),
+                )
             } else {
-                // Not all projected columns exist in this file (data 
evolution case),
-                // return as-is; the caller (merge_files_by_columns) handles 
missing columns.
-                yield batch;
+                RecordBatch::try_new(target_schema.clone(), columns)
             }
+            .map_err(|e| {
+                Error::UnexpectedError {
+                    message: format!("Failed to build schema-evolved 
RecordBatch: {e}"),
+                    source: Some(Box::new(e)),
+                }
+            })?;
+            yield result;
         }
     }
     .boxed())
 }
 
 /// Merge multiple files column-wise for data evolution, streaming with 
bounded memory.
 ///
+/// Uses field IDs (not column names) to resolve which file provides which 
column,
+/// ensuring correctness across schema evolution (column rename, add, drop).
+///
 /// Opens all file readers simultaneously and maintains a cursor (current 
batch + offset)
 /// per file. Each poll slices up to `batch_size` rows from each file's 
current batch,
 /// assembles columns from the winning files, and yields the merged batch. 
When a file's
 /// current batch is exhausted, the next batch is read from its stream on 
demand.
 fn merge_files_by_columns(
     file_io: &FileIO,
     split: &DataSplit,
-    projected_column_names: &[String],
-    table_field_names: &[String],
+    read_type: &[DataField],
+    table_fields: &[DataField],
+    schema_manager: SchemaManager,
+    table_schema_id: i64,
     batch_size: Option<usize>,
 ) -> crate::Result<ArrowRecordBatchStream> {
     let data_files = split.data_files();
     if data_files.is_empty() {
         return Ok(futures::stream::empty().boxed());
     }
 
-    // Determine which columns each file provides and resolve conflicts by 
max_sequence_number.
-    // column_name -> (file_index, max_sequence_number)
-    let mut column_source: HashMap<String, (usize, i64)> = HashMap::new();
+    // Build owned data for the stream closure.
+    let file_io = file_io.clone();
+    let split = split.clone();
+    let data_files: Vec<DataFileMeta> = data_files.to_vec();
+    let read_type = read_type.to_vec();
+    let table_fields = table_fields.to_vec();
+    let output_batch_size = batch_size.unwrap_or(1024);
+    let target_schema = build_target_arrow_schema(&read_type)?;
+
+    Ok(try_stream! {
+        // Pre-load schemas and collect field IDs + data_fields per file.
+        // file_idx -> (field_ids, Option<Vec<DataField>>)
+        let mut file_info: HashMap<usize, (Vec<i32>, Option<Vec<DataField>>)> 
= HashMap::new();
+
+        for (file_idx, file_meta) in data_files.iter().enumerate() {
+            let (field_ids, data_fields) = if file_meta.schema_id != 
table_schema_id {
+                let file_schema = 
schema_manager.schema(file_meta.schema_id).await?;
+                let file_fields = file_schema.fields();
+
+                let ids: Vec<i32> = if let Some(ref wc) = file_meta.write_cols 
{
+                    // write_cols names are from the file's schema at write 
time.
+                    wc.iter()
+                        .filter_map(|name| file_fields.iter().find(|f| 
f.name() == name).map(|f| f.id()))
+                        .collect()
+                } else {
+                    file_fields.iter().map(|f| f.id()).collect()
+                };
 
-    for (file_idx, file_meta) in data_files.iter().enumerate() {
-        let file_columns: Vec<String> = if let Some(ref wc) = 
file_meta.write_cols {
-            wc.clone()
-        } else {
-            table_field_names.to_vec()
-        };
+                (ids, Some(file_fields.to_vec()))
+            } else {
+                let ids: Vec<i32> = if let Some(ref wc) = file_meta.write_cols 
{
+                    // write_cols names are from the current table schema.
+                    wc.iter()
+                        .filter_map(|name| table_fields.iter().find(|f| 
f.name() == name).map(|f| f.id()))
+                        .collect()
+                } else {
+                    table_fields.iter().map(|f| f.id()).collect()
+                };
 
-        for col in &file_columns {
-            let entry = column_source
-                .entry(col.clone())
-                .or_insert((file_idx, i64::MIN));
-            if file_meta.max_sequence_number > entry.1 {
-                *entry = (file_idx, file_meta.max_sequence_number);
-            }
+                (ids, None)
+            };
+
+            file_info.insert(file_idx, (field_ids, data_fields));
         }
-    }
 
-    // For each file, determine which projected columns to read from it.
-    // file_index -> Vec<column_name>
-    let mut file_read_columns: HashMap<usize, Vec<String>> = HashMap::new();
-    for col_name in projected_column_names {
-        if let Some(&(file_idx, _)) = column_source.get(col_name) {
-            file_read_columns
-                .entry(file_idx)
-                .or_default()
-                .push(col_name.clone());
+        // Determine which file provides each field ID, resolving conflicts by 
max_sequence_number.
+        // field_id -> (file_index, max_sequence_number)
+        let mut field_id_source: HashMap<i32, (usize, i64)> = HashMap::new();
+        for (file_idx, file_meta) in data_files.iter().enumerate() {
+            let (ref field_ids, _) = file_info[&file_idx];
+            for &fid in field_ids {
+                let entry = field_id_source
+                    .entry(fid)
+                    .or_insert((file_idx, i64::MIN));
+                if file_meta.max_sequence_number > entry.1 {
+                    *entry = (file_idx, file_meta.max_sequence_number);
+                }
+            }
         }
-    }
 
-    // For each projected column, record (file_index, column_name) for 
assembly.
-    let column_plan: Vec<(Option<usize>, String)> = projected_column_names
-        .iter()
-        .map(|col_name| {
-            let file_idx = column_source.get(col_name).map(|&(idx, _)| idx);
-            (file_idx, col_name.clone())
-        })
-        .collect();
+        // For each projected field, determine which file provides it (by 
field ID).
+        // file_index -> Vec<column_name>  (target column names)
+        let mut file_read_columns: HashMap<usize, Vec<String>> = 
HashMap::new();
+        for field in &read_type {
+            if let Some(&(file_idx, _)) = field_id_source.get(&field.id()) {
+                file_read_columns
+                    .entry(file_idx)
+                    .or_default()
+                    .push(field.name().to_string());
+            }
+        }
 
-    // Collect which file indices we need to open streams for.
-    let active_file_indices: Vec<usize> = 
file_read_columns.keys().copied().collect();
+        // For each projected field, record (file_index, target_column_name) 
for assembly.
+        let column_plan: Vec<(Option<usize>, String)> = read_type
+            .iter()
+            .map(|field| {
+                let file_idx = field_id_source.get(&field.id()).map(|&(idx, 
_)| idx);
+                (file_idx, field.name().to_string())
+            })
+            .collect();
 
-    // Build owned data for the stream closure.
-    let file_io = file_io.clone();
-    let split = split.clone();
-    let data_files: Vec<DataFileMeta> = data_files.to_vec();
-    let projected_column_names = projected_column_names.to_vec();
-    let output_batch_size = batch_size.unwrap_or(1024);
+        // Collect which file indices we need to open streams for.
+        let active_file_indices: Vec<usize> = 
file_read_columns.keys().copied().collect();

Review Comment:
   I’m a bit worried about an edge case here.
   
   If the projection only asks for a newly added column that none of the files 
in this split contain yet, `file_read_columns` stays empty, 
`active_file_indices` becomes empty, and the merge loop exits immediately. In 
that case, we don’t return a NULL-filled column, we just skip the whole row 
group.
   
   That feels risky. In a `data evolution + add column` case, something like 
`SELECT new_col FROM ...` would silently lose the rows coming from older data. 
I think we still need to preserve the row count here and fill the projected 
column with NULLs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to