e1ijah1 commented on code in PR #6458:
URL: https://github.com/apache/arrow-datafusion/pull/6458#discussion_r1217033114
##########
datafusion/core/src/physical_plan/file_format/mod.rs:
##########
@@ -489,38 +491,93 @@ impl SchemaAdapter {
field_mappings,
})
}
+
+ /// Creates a `SchemaMapping` that can be used to cast or map the columns
from the file schema
+ /// to the table schema, taking into account the provided projections.
+ pub fn map_schema_with_projection(
+ &self,
+ file_schema: &Schema,
+ projections: &[usize],
+ ) -> Result<(SchemaMapping, Vec<usize>)> {
+ let mut field_mappings: Vec<(usize, Option<DataType>)> = Vec::new();
+ let mut mapped: Vec<usize> = vec![];
+
+ for idx in projections {
+ let field = self.table_schema.field(*idx);
+ match file_schema.index_of(field.name().as_str()) {
+ Ok(mapped_idx)
+ if can_cast_types(
+ file_schema.field(mapped_idx).data_type(),
+ field.data_type(),
+ ) =>
+ {
+ field_mappings.push((*idx,
Some(field.data_type().clone())));
+ mapped.push(mapped_idx);
+ }
+ Ok(mapped_idx) => {
+ return Err(DataFusionError::Plan(format!(
+ "Cannot cast file schema field {} of type {:?} to
table schema field of type {:?}",
+ field.name(),
+ file_schema.field(mapped_idx).data_type(),
+ field.data_type()
+ )));
+ }
+ Err(_) => {
+ field_mappings.push((*idx, None));
+ }
+ }
+ }
+ Ok((
+ SchemaMapping {
+ table_schema: self.table_schema.clone(),
+ field_mappings,
+ },
+ mapped,
+ ))
+ }
}
/// The SchemaMapping struct holds a mapping from the file schema to the table
schema
/// and any necessary type conversions that need to be applied.
#[derive(Debug)]
pub struct SchemaMapping {
- #[allow(dead_code)]
table_schema: SchemaRef,
- #[allow(dead_code)]
- field_mappings: Vec<(usize, DataType)>,
+ field_mappings: Vec<(usize, Option<DataType>)>,
}
impl SchemaMapping {
/// Adapts a `RecordBatch` to match the `table_schema` using the stored
mapping and conversions.
- #[allow(dead_code)]
fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
- let mut mapped_cols = Vec::with_capacity(self.field_mappings.len());
+ let mut cols = Vec::with_capacity(self.field_mappings.len());
+ let batch_schema = batch.schema();
Review Comment:
We need to perform lookups for every batch. This is necessary because the
column order in a batch may not consistently align with the column order in the
`file_schema`.
https://github.com/apache/arrow-datafusion/blob/aae7ec3bdb64bf0346249ccb9e44abdc29880904/datafusion/core/src/physical_plan/file_format/mod.rs#L418-L438
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]