jecsand838 commented on code in PR #17861:
URL: https://github.com/apache/datafusion/pull/17861#discussion_r2562427446


##########
datafusion/datasource-avro/src/source.rs:
##########
@@ -51,20 +50,29 @@ impl AvroSource {
         Self {
             table_schema: table_schema.into(),
             batch_size: None,
-            projection: None,
+            file_projection: None,
             metrics: ExecutionPlanMetricsSet::new(),
             projected_statistics: None,
             schema_adapter_factory: None,
         }
     }
 
-    fn open<R: std::io::Read>(&self, reader: R) -> Result<AvroReader<'static, 
R>> {
-        AvroReader::try_new(
-            reader,
-            Arc::clone(self.table_schema.file_schema()),
-            self.batch_size.expect("Batch size must set before open"),
-            self.projection.clone(),
-        )
+    fn open<R: std::io::BufRead>(&self, reader: R) -> Result<Reader<R>> {
+        let schema = self.table_schema.file_schema().as_ref(); // todo - avro 
metadata loading
+
+        let projected_schema = if let Some(projection) = &self.file_projection 
{
+            &schema.project(projection)?
+        } else {
+            schema
+        };
+
+        let avro_schema = AvroSchema::try_from(projected_schema)?;
+
+        ReaderBuilder::new()
+            .with_reader_schema(avro_schema) // Used for projection on read.
+            .with_batch_size(self.batch_size.expect("Batch size must set 
before open"))
+            .build(reader)
+            .map_err(Into::into)
     }

Review Comment:
   The `source.rs` change just ensures the schema metadata provided upstream is 
used whenever projection is needed.
   
   The code below should get us there and seems to be working (at least) for 
me. 
   
   Once `arrow-avro` gets the `ReaderBuilder::with_projection()` method we can 
simplify this logic. I added details in the Todo comments.
   
   ```suggestion
       fn open<R: std::io::BufRead>(&self, reader: R) -> Result<Reader<R>> {
           // TODO: Once `ReaderBuilder::with_projection` is available, we 
should use it instead.
           //  This should be an easy change. We'd simply need to:
           //      1. Use the full file schema to generate the reader 
`AvroSchema`.
           //      2. Pass `&self.file_projection` into 
`ReaderBuilder::with_projection`.
           //      3. Remove the `build_projected_reader_schema` methods.
           ReaderBuilder::new()
               .with_reader_schema(self.build_projected_reader_schema()?)
               .with_batch_size(self.batch_size.expect("Batch size must set 
before open"))
               .build(reader)
               .map_err(Into::into)
       }
   
       fn build_projected_reader_schema(&self) -> Result<AvroSchema> {
           let file_schema = self.table_schema.file_schema().as_ref();
           // Fast path: no projection. If we have the original writer schema 
JSON
           // in metadata, just reuse it as-is without parsing.
           if self.file_projection.is_none() {
               return if let Some(avro_json) =
                   file_schema.metadata().get(SCHEMA_METADATA_KEY)
               {
                   Ok(AvroSchema::new(avro_json.clone()))
               } else {
                   // Fall back to deriving Avro from the full Arrow file 
schema, should be ok
                   // if not using projection.
                   Ok(AvroSchema::try_from(file_schema)
                       .map_err(Into::<DataFusionError>::into)?)
               };
           }
           // Use the writer Avro schema JSON tagged upstream to build a 
projected reader schema
           match file_schema.metadata().get(SCHEMA_METADATA_KEY) {
               Some(avro_json) => {
                   let mut schema_json: Value =
                       serde_json::from_str(avro_json).map_err(|e| {
                           DataFusionError::Execution(format!(
                               "Failed to parse Avro schema JSON from metadata: 
{e}"
                           ))
                       })?;
                   let obj = schema_json.as_object_mut().ok_or_else(|| {
                       DataFusionError::Execution(
                           "Top-level Avro schema JSON must be an 
object".to_string(),
                       )
                   })?;
                   let fields_val = obj.get_mut("fields").ok_or_else(|| {
                       DataFusionError::Execution(
                           "Top-level Avro schema JSON must contain a `fields` 
array"
                               .to_string(),
                       )
                   })?;
                   let fields_arr = fields_val.as_array_mut().ok_or_else(|| {
                       DataFusionError::Execution(
                           "Top-level Avro schema `fields` must be an 
array".to_string(),
                       )
                   })?;
                   // Move existing fields out so we can rebuild them in 
projected order.
                   let original_fields = std::mem::take(fields_arr);
                   let mut by_name: HashMap<String, Value> =
                       HashMap::with_capacity(original_fields.len());
                   for field in original_fields {
                       if let Some(name) = field.get("name").and_then(|v| 
v.as_str()) {
                           by_name.insert(name.to_string(), field);
                       }
                   }
                   // Rebuild `fields` in the same order as the projected Arrow 
schema.
                   let projection = self.file_projection.as_ref().ok_or_else(|| 
{
                       DataFusionError::Internal("checked file_projection is 
Some above".to_string())
                   })?;
                   let projected_schema = file_schema.project(projection)?;
                   let mut projected_fields =
                       Vec::with_capacity(projected_schema.fields().len());
                   for arrow_field in projected_schema.fields() {
                       let name = arrow_field.name();
                       let field = by_name.remove(name).ok_or_else(|| {
                           DataFusionError::Execution(format!(
                               "Projected field `{name}` not found in Avro 
writer schema"
                           ))
                       })?;
                       projected_fields.push(field);
                   }
                   *fields_val = Value::Array(projected_fields);
                   let projected_json =
                       serde_json::to_string(&schema_json).map_err(|e| {
                           DataFusionError::Execution(format!(
                               "Failed to serialize projected Avro schema JSON: 
{e}"
                           ))
                       })?;
                   Ok(AvroSchema::new(projected_json))
               }
               None => Err(DataFusionError::Execution(format!(
                   "Avro schema metadata ({SCHEMA_METADATA_KEY}) is missing 
from file schema, but is required for projection"
               ))),
           }
       }
   }
   ```



##########
datafusion/datasource-avro/src/mod.rs:
##########
@@ -26,9 +26,19 @@
 
 //! An [Avro](https://avro.apache.org/) based 
[`FileSource`](datafusion_datasource::file::FileSource) implementation and 
related functionality.
 
-pub mod avro_to_arrow;
 pub mod file_format;
 pub mod source;
 
-pub use apache_avro;
+use arrow::datatypes::Schema;
+pub use arrow_avro;
+use arrow_avro::reader::ReaderBuilder;
 pub use file_format::*;
+use std::io::{BufReader, Read};
+
+/// Read Avro schema given a reader
+pub fn read_avro_schema_from_reader<R: Read>(
+    reader: &mut R,
+) -> datafusion_common::Result<Schema> {
+    let avro_reader = ReaderBuilder::new().build(BufReader::new(reader))?;
+    Ok(avro_reader.schema().as_ref().clone())
+}

Review Comment:
   The code below should be able to get the schema metadata from the OCF header 
and tag it to the `datafusion_common::Result<Schema>` being returned for later 
use in `source.rs`. 
   
   I also added some unit tests and it all seems to be working for me. 
   
   **NOTE:** There's also a small change in `source.rs` we need to made. I'll 
leave a comment for that as well.
   
   ```suggestion
   use arrow_avro::schema::SCHEMA_METADATA_KEY;
   use datafusion_common::DataFusionError;
   use std::io::{BufReader, Read};
   use std::sync::Arc;
   
   /// Read Avro schema given a reader
   pub fn read_avro_schema_from_reader<R: Read>(
       reader: &mut R,
   ) -> datafusion_common::Result<Schema> {
       let avro_reader = ReaderBuilder::new().build(BufReader::new(reader))?;
       let schema_ref = avro_reader.schema();
       // Extract the raw Avro JSON schema from the OCF header.
       let raw_json = avro_reader
           .avro_header()
           .get(SCHEMA_METADATA_KEY.as_bytes())
           .map(|bytes| {
               std::str::from_utf8(bytes).map_err(|e| {
                   DataFusionError::Execution(format!(
                       "Invalid UTF-8 in Avro schema metadata 
({SCHEMA_METADATA_KEY}): {e}"
                   ))
               })
           })
           .transpose()?
           .map(str::to_owned);
       drop(avro_reader);
       if let Some(raw_json) = raw_json {
           let mut schema = Arc::unwrap_or_clone(schema_ref);
           // Insert the raw Avro JSON schema using `SCHEMA_METADATA_KEY`.
           // This should enable the avro schema metadata to be picked 
downstream.
           schema
               .metadata
               .insert(SCHEMA_METADATA_KEY.to_string(), raw_json);
           Ok(schema)
       } else {
           // Return error because Avro spec requires the Avro schema metadata 
to be present in the OCF header.
           Err(DataFusionError::Execution(format!(
               "Avro schema metadata ({SCHEMA_METADATA_KEY}) is missing from 
OCF header"
           )))
       }
   }
   
   #[cfg(test)]
   mod test {
       use super::*;
       use arrow::array::{BinaryArray, BooleanArray, Float64Array};
       use arrow::datatypes::DataType;
       use arrow_avro::reader::ReaderBuilder;
       use arrow_avro::schema::{AvroSchema, SCHEMA_METADATA_KEY};
       use datafusion_common::test_util::arrow_test_data;
       use datafusion_common::{DataFusionError, Result as DFResult};
       use serde_json::Value;
       use std::collections::HashMap;
       use std::fs::File;
       use std::io::BufReader;
   
       fn avro_test_file(name: &str) -> String {
           format!("{}/avro/{name}", arrow_test_data())
       }
   
       #[test]
       fn read_avro_schema_includes_avro_json_metadata() -> DFResult<()> {
           let path = avro_test_file("alltypes_plain.avro");
           let mut file = File::open(&path)?;
           let schema = read_avro_schema_from_reader(&mut file)?;
           let meta_json = schema
               .metadata()
               .get(SCHEMA_METADATA_KEY)
               .expect("schema metadata missing avro.schema entry");
           assert!(
               !meta_json.is_empty(),
               "avro.schema metadata should not be empty"
           );
           let mut raw = File::open(&path)?;
           let avro_reader = ReaderBuilder::new().build(BufReader::new(&mut 
raw))?;
           let header_json = avro_reader
               .avro_header()
               .get(SCHEMA_METADATA_KEY.as_bytes())
               .and_then(|bytes| std::str::from_utf8(bytes).ok())
               .expect("missing avro.schema metadata in OCF header");
           assert_eq!(
               meta_json, header_json,
               "schema metadata avro.schema should match OCF header"
           );
           Ok(())
       }
   
       #[test]
       fn read_and_project_using_schema_metadata() -> DFResult<()> {
           let path = avro_test_file("alltypes_dictionary.avro");
           let mut file = File::open(&path)?;
           let file_schema = read_avro_schema_from_reader(&mut file)?;
           let projected_field_names = vec!["string_col", "double_col", 
"bool_col"];
           let avro_json = file_schema
               .metadata()
               .get(SCHEMA_METADATA_KEY)
               .expect("schema metadata missing avro.schema entry");
           let projected_avro_schema =
               build_projected_reader_schema(avro_json, 
&projected_field_names)?;
           let mut reader = ReaderBuilder::new()
               .with_reader_schema(projected_avro_schema)
               .with_batch_size(64)
               .build(BufReader::new(File::open(&path)?))?;
           let batch = reader.next().expect("no batch produced")?;
           assert_eq!(3, batch.num_columns());
           assert_eq!(2, batch.num_rows());
           let schema = batch.schema();
           assert_eq!("string_col", schema.field(0).name());
           assert_eq!(&DataType::Binary, schema.field(0).data_type());
           let col = batch
               .column(0)
               .as_any()
               .downcast_ref::<BinaryArray>()
               .expect("column 0 not BinaryArray");
           assert_eq!("0".as_bytes(), col.value(0));
           assert_eq!("1".as_bytes(), col.value(1));
           assert_eq!("double_col", schema.field(1).name());
           assert_eq!(&DataType::Float64, schema.field(1).data_type());
           let col = batch
               .column(1)
               .as_any()
               .downcast_ref::<Float64Array>()
               .expect("column 1 not Float64Array");
           assert_eq!(0.0, col.value(0));
           assert_eq!(10.1, col.value(1));
           assert_eq!("bool_col", schema.field(2).name());
           assert_eq!(&DataType::Boolean, schema.field(2).data_type());
           let col = batch
               .column(2)
               .as_any()
               .downcast_ref::<BooleanArray>()
               .expect("column 2 not BooleanArray");
           assert!(col.value(0));
           assert!(!col.value(1));
           Ok(())
       }
   
       fn build_projected_reader_schema(
           avro_json: &str,
           projected_field_names: &[&str],
       ) -> DFResult<AvroSchema> {
           let mut schema_json: Value = 
serde_json::from_str(avro_json).map_err(|e| {
               DataFusionError::Execution(format!(
                   "Failed to parse Avro schema JSON from metadata: {e}"
               ))
           })?;
           let obj = schema_json.as_object_mut().ok_or_else(|| {
               DataFusionError::Execution(
                   "Top-level Avro schema JSON is not an object".to_string(),
               )
           })?;
           let fields_val = obj.get_mut("fields").ok_or_else(|| {
               DataFusionError::Execution(
                   "Top-level Avro schema JSON has no `fields` key".to_string(),
               )
           })?;
           let fields = fields_val.as_array_mut().ok_or_else(|| {
               DataFusionError::Execution(
                   "Top-level Avro schema `fields` is not an array".to_string(),
               )
           })?;
           let mut by_name: HashMap<String, Value> = HashMap::new();
           for field in fields.iter() {
               if let Some(name) = field.get("name").and_then(|v| v.as_str()) {
                   by_name.insert(name.to_string(), field.clone());
               }
           }
           let mut projected_fields = 
Vec::with_capacity(projected_field_names.len());
           for name in projected_field_names {
               let Some(field) = by_name.get(*name) else {
                   return Err(DataFusionError::Execution(format!(
                       "Projected field `{name}` not found in Avro writer 
schema"
                   )));
               };
               projected_fields.push(field.clone());
           }
           *fields_val = Value::Array(projected_fields);
           let projected_json = serde_json::to_string(&schema_json).map_err(|e| 
{
               DataFusionError::Execution(format!(
                   "Failed to serialize projected Avro schema JSON: {e}"
               ))
           })?;
           Ok(AvroSchema::new(projected_json))
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to