JingsongLi commented on code in PR #197:
URL: https://github.com/apache/paimon-rust/pull/197#discussion_r3032239910
##########
crates/paimon/src/arrow/reader.rs:
##########
@@ -242,108 +326,213 @@ fn read_single_file_stream(
let mut batch_stream = batch_stream_builder.build()?;
while let Some(batch) = batch_stream.next().await {
let batch = batch?;
- // Reorder columns from parquet-schema order to
projected_column_names order,
- // consistent with the normal read() path.
- let reorder_indices: Vec<usize> = projected_column_names
- .iter()
- .filter_map(|name| batch.schema().index_of(name).ok())
- .collect();
- if reorder_indices.len() == batch.num_columns() {
- yield batch.project(&reorder_indices).map_err(|e| {
- Error::UnexpectedError {
- message: "Failed to reorder projected
columns".to_string(),
- source: Some(Box::new(e)),
+ let num_rows = batch.num_rows();
+ let batch_schema = batch.schema();
+
+ // Build output columns using index mapping (field-ID-based) or by
name.
+ let mut columns: Vec<Arc<dyn arrow_array::Array>> =
Vec::with_capacity(target_schema.fields().len());
+ for (i, target_field) in target_schema.fields().iter().enumerate()
{
+ let source_col = if let Some(ref idx_map) = index_mapping {
+ let data_idx = idx_map[i];
+ if data_idx == NULL_FIELD_INDEX {
+ None
+ } else {
+ // Find the column in the batch by the data field's
name.
+ let data_field =
&data_fields.as_ref().unwrap()[data_idx as usize];
+ batch_schema
+ .index_of(data_field.name())
+ .ok()
+ .map(|col_idx| batch.column(col_idx))
+ }
+ } else if let Some(ref df) = data_fields {
+ // Identity mapping with data_fields present (e.g. renamed
column).
+ // Use data field name (old name in parquet) at the same
position.
+ batch_schema
+ .index_of(df[i].name())
+ .ok()
+ .map(|col_idx| batch.column(col_idx))
+ } else {
+ // No schema evolution — look up by target field name.
+ batch_schema
+ .index_of(target_field.name())
+ .ok()
+ .map(|col_idx| batch.column(col_idx))
+ };
+
+ match source_col {
+ Some(col) => {
+ if col.data_type() == target_field.data_type() {
+ columns.push(col.clone());
+ } else {
+ // Type promotion: cast to target type.
+ let casted = cast(col,
target_field.data_type()).map_err(|e| {
+ Error::UnexpectedError {
+ message: format!(
+ "Failed to cast column '{}' from {:?}
to {:?}: {e}",
+ target_field.name(),
+ col.data_type(),
+ target_field.data_type()
+ ),
+ source: Some(Box::new(e)),
+ }
+ })?;
+ columns.push(casted);
+ }
+ }
+ None => {
+ // Column missing from file: fill with nulls.
+ let null_array =
arrow_array::new_null_array(target_field.data_type(), num_rows);
+ columns.push(null_array);
}
- })?;
+ }
+ }
+
+ let result = if columns.is_empty() {
+ RecordBatch::try_new_with_options(
+ target_schema.clone(),
+ columns,
+
&arrow_array::RecordBatchOptions::new().with_row_count(Some(num_rows)),
+ )
} else {
- // Not all projected columns exist in this file (data
evolution case),
- // return as-is; the caller (merge_files_by_columns) handles
missing columns.
- yield batch;
+ RecordBatch::try_new(target_schema.clone(), columns)
}
+ .map_err(|e| {
+ Error::UnexpectedError {
+ message: format!("Failed to build schema-evolved
RecordBatch: {e}"),
+ source: Some(Box::new(e)),
+ }
+ })?;
+ yield result;
}
}
.boxed())
}
/// Merge multiple files column-wise for data evolution, streaming with
bounded memory.
///
+/// Uses field IDs (not column names) to resolve which file provides which
column,
+/// ensuring correctness across schema evolution (column rename, add, drop).
+///
/// Opens all file readers simultaneously and maintains a cursor (current
batch + offset)
/// per file. Each poll slices up to `batch_size` rows from each file's
current batch,
/// assembles columns from the winning files, and yields the merged batch.
When a file's
/// current batch is exhausted, the next batch is read from its stream on
demand.
fn merge_files_by_columns(
file_io: &FileIO,
split: &DataSplit,
- projected_column_names: &[String],
- table_field_names: &[String],
+ read_type: &[DataField],
+ table_fields: &[DataField],
+ schema_manager: SchemaManager,
+ table_schema_id: i64,
batch_size: Option<usize>,
) -> crate::Result<ArrowRecordBatchStream> {
let data_files = split.data_files();
if data_files.is_empty() {
return Ok(futures::stream::empty().boxed());
}
- // Determine which columns each file provides and resolve conflicts by
max_sequence_number.
- // column_name -> (file_index, max_sequence_number)
- let mut column_source: HashMap<String, (usize, i64)> = HashMap::new();
+ // Build owned data for the stream closure.
+ let file_io = file_io.clone();
+ let split = split.clone();
+ let data_files: Vec<DataFileMeta> = data_files.to_vec();
+ let read_type = read_type.to_vec();
+ let table_fields = table_fields.to_vec();
+ let output_batch_size = batch_size.unwrap_or(1024);
+ let target_schema = build_target_arrow_schema(&read_type)?;
+
+ Ok(try_stream! {
+ // Pre-load schemas and collect field IDs + data_fields per file.
+ // file_idx -> (field_ids, Option<Vec<DataField>>)
+ let mut file_info: HashMap<usize, (Vec<i32>, Option<Vec<DataField>>)>
= HashMap::new();
+
+ for (file_idx, file_meta) in data_files.iter().enumerate() {
+ let (field_ids, data_fields) = if file_meta.schema_id !=
table_schema_id {
+ let file_schema =
schema_manager.schema(file_meta.schema_id).await?;
+ let file_fields = file_schema.fields();
+
+ let ids: Vec<i32> = if let Some(ref wc) = file_meta.write_cols
{
+ // write_cols names are from the file's schema at write
time.
+ wc.iter()
+ .filter_map(|name| file_fields.iter().find(|f|
f.name() == name).map(|f| f.id()))
+ .collect()
+ } else {
+ file_fields.iter().map(|f| f.id()).collect()
+ };
- for (file_idx, file_meta) in data_files.iter().enumerate() {
- let file_columns: Vec<String> = if let Some(ref wc) =
file_meta.write_cols {
- wc.clone()
- } else {
- table_field_names.to_vec()
- };
+ (ids, Some(file_fields.to_vec()))
+ } else {
+ let ids: Vec<i32> = if let Some(ref wc) = file_meta.write_cols
{
+ // write_cols names are from the current table schema.
+ wc.iter()
+ .filter_map(|name| table_fields.iter().find(|f|
f.name() == name).map(|f| f.id()))
+ .collect()
+ } else {
+ table_fields.iter().map(|f| f.id()).collect()
+ };
- for col in &file_columns {
- let entry = column_source
- .entry(col.clone())
- .or_insert((file_idx, i64::MIN));
- if file_meta.max_sequence_number > entry.1 {
- *entry = (file_idx, file_meta.max_sequence_number);
- }
+ (ids, None)
+ };
+
+ file_info.insert(file_idx, (field_ids, data_fields));
}
- }
- // For each file, determine which projected columns to read from it.
- // file_index -> Vec<column_name>
- let mut file_read_columns: HashMap<usize, Vec<String>> = HashMap::new();
- for col_name in projected_column_names {
- if let Some(&(file_idx, _)) = column_source.get(col_name) {
- file_read_columns
- .entry(file_idx)
- .or_default()
- .push(col_name.clone());
+ // Determine which file provides each field ID, resolving conflicts by
max_sequence_number.
+ // field_id -> (file_index, max_sequence_number)
+ let mut field_id_source: HashMap<i32, (usize, i64)> = HashMap::new();
+ for (file_idx, file_meta) in data_files.iter().enumerate() {
+ let (ref field_ids, _) = file_info[&file_idx];
+ for &fid in field_ids {
+ let entry = field_id_source
+ .entry(fid)
+ .or_insert((file_idx, i64::MIN));
+ if file_meta.max_sequence_number > entry.1 {
+ *entry = (file_idx, file_meta.max_sequence_number);
+ }
+ }
}
- }
- // For each projected column, record (file_index, column_name) for
assembly.
- let column_plan: Vec<(Option<usize>, String)> = projected_column_names
- .iter()
- .map(|col_name| {
- let file_idx = column_source.get(col_name).map(|&(idx, _)| idx);
- (file_idx, col_name.clone())
- })
- .collect();
+ // For each projected field, determine which file provides it (by
field ID).
+ // file_index -> Vec<column_name> (target column names)
+ let mut file_read_columns: HashMap<usize, Vec<String>> =
HashMap::new();
+ for field in &read_type {
+ if let Some(&(file_idx, _)) = field_id_source.get(&field.id()) {
+ file_read_columns
+ .entry(file_idx)
+ .or_default()
+ .push(field.name().to_string());
+ }
+ }
- // Collect which file indices we need to open streams for.
- let active_file_indices: Vec<usize> =
file_read_columns.keys().copied().collect();
+ // For each projected field, record (file_index, target_column_name)
for assembly.
+ let column_plan: Vec<(Option<usize>, String)> = read_type
+ .iter()
+ .map(|field| {
+ let file_idx = field_id_source.get(&field.id()).map(|&(idx,
_)| idx);
+ (file_idx, field.name().to_string())
+ })
+ .collect();
- // Build owned data for the stream closure.
- let file_io = file_io.clone();
- let split = split.clone();
- let data_files: Vec<DataFileMeta> = data_files.to_vec();
- let projected_column_names = projected_column_names.to_vec();
- let output_batch_size = batch_size.unwrap_or(1024);
+ // Collect which file indices we need to open streams for.
+ let active_file_indices: Vec<usize> =
file_read_columns.keys().copied().collect();
Review Comment:
I will fix it and add tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]