getChan commented on code in PR #17861:
URL: https://github.com/apache/datafusion/pull/17861#discussion_r2988128076


##########
datafusion/datasource-avro/src/source.rs:
##########
@@ -56,22 +57,83 @@ impl AvroSource {
         }
     }
 
-    fn open<R: std::io::Read>(&self, reader: R) -> Result<AvroReader<'static, 
R>> {
+    fn open<R: std::io::BufRead>(
+        &self,
+        reader: R,
+        projection: Option<Vec<usize>>,
+    ) -> Result<Reader<R>> {
+        let mut builder = ReaderBuilder::new()
+            .with_batch_size(self.batch_size.expect("Batch size must set 
before open"));
+        if let Some(projection) = projection {
+            builder = builder.with_projection(projection);
+        }
+        builder.build(reader).map_err(Into::into)
+    }
+
+    fn projected_file_schema(&self) -> SchemaRef {
         let file_schema = self.table_schema.file_schema();
-        let projection = Some(
+        if self.projection.file_indices.is_empty() {
+            return Arc::clone(file_schema);
+        }
+
+        Arc::new(Schema::new(
             self.projection
                 .file_indices
                 .iter()
-                .map(|&idx| file_schema.field(idx).name().clone())
+                .map(|idx| file_schema.field(*idx).clone())
                 .collect::<Vec<_>>(),
-        );
-        AvroReader::try_new(
-            reader,
-            &Arc::clone(self.table_schema.file_schema()),
-            self.batch_size.expect("Batch size must set before open"),
-            projection.as_ref(),
-        )
+        ))
+    }
+
+    fn writer_projection_for_schema(
+        &self,
+        writer_schema: &Schema,
+        target_schema: &Schema,
+    ) -> Option<Vec<usize>> {
+        // `arrow-avro` accepts projection ordinals against the file's writer 
schema,
+        // while DataFusion plans projection against the logical table schema. 
Remap
+        // projected column names to writer ordinals so reader-level pushdown 
still
+        // preserves DataFusion's existing name-based projection semantics.
+        let projection = target_schema
+            .fields()
+            .iter()
+            .filter_map(|field| {
+                writer_schema
+                    .column_with_name(field.name())
+                    .map(|(idx, _)| idx)
+            })
+            .collect::<Vec<_>>();
+
+        let identity_projection = projection.len() == 
writer_schema.fields().len()
+            && projection
+                .iter()
+                .enumerate()
+                .all(|(idx, value)| idx == *value);
+
+        (!identity_projection).then_some(projection)
+    }
+}
+
+fn coerce_batch_to_schema(
+    batch: &RecordBatch,
+    target_schema: SchemaRef,
+) -> Result<RecordBatch> {

Review Comment:
   It may look similar to `BatchAdapter`, but replacing it directly in the 
current Avro reader path breaks things. At the moment, 
`arrow_avro::ReaderBuilder::with_projection(...)` supports ordinal projection 
against the Avro writer schema, not projection against DataFusion's logical 
schema.
   
   That means the reader returns a physical batch that has already been pruned 
in Avro writer-schema order, while the downstream DataFusion path still assumes 
logical-schema-based column identity. The current `coerce_batch_to_schema` step 
only reconstructs the batch by name into the DataFusion logical schema shape.
   
   By contrast, if `BatchAdapter` is inserted at the current location, it 
builds a projector against the projected batch schema as the source schema, 
which then conflicts with downstream paths that still expect DataFusion logical 
indices.
   
   I agree this is a good refactoring candidate in principle, but for now I 
would prefer to keep the current implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to