itsjunetime commented on code in PR #12135:
URL: https://github.com/apache/datafusion/pull/12135#discussion_r1761651689
##########
datafusion/core/src/datasource/schema_adapter.rs:
##########
@@ -167,55 +186,95 @@ impl SchemaAdapter for DefaultSchemaAdapter {
/// The SchemaMapping struct holds a mapping from the file schema to the table
schema
/// and any necessary type conversions that need to be applied.
+///
+/// This needs both the projected table schema and full table schema because
its different
+/// functions have different needs. The `map_batch` function is only used by
the ParquetOpener to
+/// produce a RecordBatch which has the projected schema, since that's the
schema which is supposed
+/// to come out of the execution of this query. `map_partial_batch`, however,
is used to
#[derive(Debug)]
pub struct SchemaMapping {
- /// The schema of the table. This is the expected schema after conversion
and it should match the schema of the query result.
- table_schema: SchemaRef,
- /// Mapping from field index in `table_schema` to index in projected
file_schema
+ /// The schema of the table. This is the expected schema after conversion
and it should match
+ /// the schema of the query result.
+ projected_table_schema: SchemaRef,
+ /// Mapping from field index in `projected_table_schema` to index in
projected file_schema.
+ /// They are Options instead of just plain `usize`s because the table
could have fields that
+ /// don't exist in the file.
field_mappings: Vec<Option<usize>>,
+ /// The entire table schema, as opposed to the projected_table_schema
(which only contains the
+ /// columns that we are projecting out of this query). This contains all
fields in the table,
+ /// regardless of if they will be projected out or not.
+ table_schema: SchemaRef,
}
impl SchemaMapper for SchemaMapping {
- /// Adapts a `RecordBatch` to match the `table_schema` using the stored
mapping and conversions.
+ /// Adapts a `RecordBatch` to match the `projected_table_schema` using the
stored mapping and
+ /// conversions. The produced RecordBatch has a schema that contains only
the projected
+ /// columns, so if one needs a RecordBatch with a schema that references
columns which are not
+ /// in the projected, it would be better to use `map_partial_batch`
fn map_batch(&self, batch: RecordBatch) ->
datafusion_common::Result<RecordBatch> {
let batch_rows = batch.num_rows();
let batch_cols = batch.columns().to_vec();
let cols = self
- .table_schema
+ .projected_table_schema
+ // go through each field in the projected schema
.fields()
.iter()
+ // and zip it with the index that maps fields from the projected
table schema to the
+ // projected file schema in `batch`
.zip(&self.field_mappings)
- .map(|(field, file_idx)| match file_idx {
- Some(batch_idx) => cast(&batch_cols[*batch_idx],
field.data_type()),
- None => Ok(new_null_array(field.data_type(), batch_rows)),
+ // and for each one...
+ .map(|(field, file_idx)| {
+ file_idx.map_or_else(
+ // If this field only exists in the table, and not in the
file, then we know
+ // that it's null, so just return that.
+ || Ok(new_null_array(field.data_type(), batch_rows)),
+ // However, if it does exist in both, then try to cast it
to the correct output
+ // type
+ |batch_idx| cast(&batch_cols[batch_idx],
field.data_type()),
+ )
})
.collect::<datafusion_common::Result<Vec<_>, _>>()?;
// Necessary to handle empty batches
let options =
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
- let schema = self.table_schema.clone();
+ let schema = self.projected_table_schema.clone();
let record_batch = RecordBatch::try_new_with_options(schema, cols,
&options)?;
Ok(record_batch)
}
+ /// Adapts a [`RecordBatch`]'s schema into one that has all the correct
output types and only
+ /// contains the fields that exist in both the file schema and table
schema.
fn map_partial_batch(
&self,
batch: RecordBatch,
) -> datafusion_common::Result<RecordBatch> {
let batch_cols = batch.columns().to_vec();
let schema = batch.schema();
- let mut cols = vec![];
- let mut fields = vec![];
- for (i, f) in schema.fields().iter().enumerate() {
- let table_field = self.table_schema.field_with_name(f.name());
- if let Ok(tf) = table_field {
- cols.push(cast(&batch_cols[i], tf.data_type())?);
- fields.push(tf.clone());
- }
- }
+ // for each field in the batch's schema...
+ let (cols, fields) = schema
+ .fields()
+ .iter()
+ .zip(batch_cols.iter())
+ .flat_map(|(field, batch_col)| {
+ self.table_schema
+ // try to get the same field from the table schema that we
have stored in self
+ .field_with_name(field.name())
+ // and if we don't have it, that's fine, ignore it. This
*is* an unexpected error
+ // if we do run into it (since a file schema should be
wholly a subset of the table
+ // schema) but these errors should've been resolved by
now. I guess.
Review Comment:
Thanks for pointing this out - I've updated the comment to reflect this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]