getChan commented on code in PR #17861:
URL: https://github.com/apache/datafusion/pull/17861#discussion_r2988124881


##########
datafusion/datasource-avro/src/source.rs:
##########
@@ -56,22 +57,83 @@ impl AvroSource {
         }
     }
 
-    fn open<R: std::io::Read>(&self, reader: R) -> Result<AvroReader<'static, 
R>> {
+    fn open<R: std::io::BufRead>(
+        &self,
+        reader: R,
+        projection: Option<Vec<usize>>,
+    ) -> Result<Reader<R>> {
+        let mut builder = ReaderBuilder::new()
+            .with_batch_size(self.batch_size.expect("Batch size must set 
before open"));
+        if let Some(projection) = projection {
+            builder = builder.with_projection(projection);
+        }
+        builder.build(reader).map_err(Into::into)
+    }
+
+    fn projected_file_schema(&self) -> SchemaRef {
         let file_schema = self.table_schema.file_schema();
-        let projection = Some(
+        if self.projection.file_indices.is_empty() {
+            return Arc::clone(file_schema);
+        }
+
+        Arc::new(Schema::new(
             self.projection
                 .file_indices
                 .iter()
-                .map(|&idx| file_schema.field(idx).name().clone())
+                .map(|idx| file_schema.field(*idx).clone())
                 .collect::<Vec<_>>(),
-        );
-        AvroReader::try_new(
-            reader,
-            &Arc::clone(self.table_schema.file_schema()),
-            self.batch_size.expect("Batch size must set before open"),
-            projection.as_ref(),
-        )
+        ))
+    }
+
+    fn writer_projection_for_schema(
+        &self,
+        writer_schema: &Schema,
+        target_schema: &Schema,
+    ) -> Option<Vec<usize>> {
+        // `arrow-avro` accepts projection ordinals against the file's writer 
schema,
+        // while DataFusion plans projection against the logical table schema. 
Remap
+        // projected column names to writer ordinals so reader-level pushdown 
still
+        // preserves DataFusion's existing name-based projection semantics.
+        let projection = target_schema
+            .fields()
+            .iter()
+            .filter_map(|field| {
+                writer_schema
+                    .column_with_name(field.name())
+                    .map(|(idx, _)| idx)
+            })
+            .collect::<Vec<_>>();
+
+        let identity_projection = projection.len() == 
writer_schema.fields().len()
+            && projection
+                .iter()
+                .enumerate()
+                .all(|(idx, value)| idx == *value);
+
+        (!identity_projection).then_some(projection)
+    }
+}
+
+fn coerce_batch_to_schema(
+    batch: &RecordBatch,
+    target_schema: SchemaRef,
+) -> Result<RecordBatch> {

Review Comment:
   `BatchAdapter`와 유사해 보일 수는 있지만, 현재 Avro reader 경로에서는 그대로 대체하면 깨집니다. 현재 
`arrow-avro::ReaderBuilder::with_projection(...)`은 DataFusion logical schema 기준 
projection이 아니라, Avro writer schema 기준 ordinal projection만 지원합니다.
   
   그래서 reader가 반환하는 batch는 이미 Avro writer schema order / projection 기준으로 축소된 
physical batch이고, DataFusion 쪽 경로는 여전히 logical schema 기준 column identity를 전제로 
동작합니다. 기존 `coerce_batch_to_schema`는 이 지점에서 이름 기반으로 batch를 DataFusion logical 
schema shape에 맞게 재구성합니다.
   
   반면 `BatchAdapter`를 현재 위치에 넣으면 projected batch schema를 source schema로 보고 
projector를 만들기 때문에, DataFusion logical index를 기대하는 downstream 경로와 충돌합니다.
   
   방향 자체는 좋은 리팩터 후보라고 생각하지만, 현재로써는 본 구현을 유지하고자 합니다.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to