alamb commented on code in PR #17861:
URL: https://github.com/apache/datafusion/pull/17861#discussion_r2988254405
##########
datafusion/datasource-avro/src/source.rs:
##########
@@ -56,22 +57,83 @@ impl AvroSource {
}
}
- fn open<R: std::io::Read>(&self, reader: R) -> Result<AvroReader<'static,
R>> {
+ fn open<R: std::io::BufRead>(
+ &self,
+ reader: R,
+ projection: Option<Vec<usize>>,
+ ) -> Result<Reader<R>> {
+ let mut builder = ReaderBuilder::new()
+ .with_batch_size(self.batch_size.expect("Batch size must set
before open"));
+ if let Some(projection) = projection {
+ builder = builder.with_projection(projection);
+ }
+ builder.build(reader).map_err(Into::into)
+ }
+
+ fn projected_file_schema(&self) -> SchemaRef {
let file_schema = self.table_schema.file_schema();
- let projection = Some(
+ if self.projection.file_indices.is_empty() {
+ return Arc::clone(file_schema);
+ }
+
+ Arc::new(Schema::new(
self.projection
.file_indices
.iter()
- .map(|&idx| file_schema.field(idx).name().clone())
+ .map(|idx| file_schema.field(*idx).clone())
.collect::<Vec<_>>(),
- );
- AvroReader::try_new(
- reader,
- &Arc::clone(self.table_schema.file_schema()),
- self.batch_size.expect("Batch size must set before open"),
- projection.as_ref(),
- )
+ ))
+ }
+
+ fn writer_projection_for_schema(
+ &self,
+ writer_schema: &Schema,
+ target_schema: &Schema,
+ ) -> Option<Vec<usize>> {
+ // `arrow-avro` accepts projection ordinals against the file's writer
schema,
+ // while DataFusion plans projection against the logical table schema.
Remap
+ // projected column names to writer ordinals so reader-level pushdown
still
+ // preserves DataFusion's existing name-based projection semantics.
+ let projection = target_schema
+ .fields()
+ .iter()
+ .filter_map(|field| {
+ writer_schema
+ .column_with_name(field.name())
+ .map(|(idx, _)| idx)
+ })
+ .collect::<Vec<_>>();
+
+ let identity_projection = projection.len() ==
writer_schema.fields().len()
+ && projection
+ .iter()
+ .enumerate()
+ .all(|(idx, value)| idx == *value);
+
+ (!identity_projection).then_some(projection)
+ }
+}
+
+fn coerce_batch_to_schema(
+ batch: &RecordBatch,
+ target_schema: SchemaRef,
+) -> Result<RecordBatch> {
Review Comment:
> That means the reader returns a physical batch that has already been
pruned in Avro writer-schema order, while the downstream DataFusion path still
assumes logical-schema-based column identity. The current
coerce_batch_to_schema step only reconstructs the batch by name into the
DataFusion logical schema shape.
Maybe this is due to the wrong schema being passed in? I don't fully follow
this argument
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]