jecsand838 commented on code in PR #8006:
URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2249114536


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -216,34 +330,98 @@ impl ReaderBuilder {
     /// - `batch_size` = 1024
     /// - `strict_mode` = false
     /// - `utf8_view` = false
-    /// - `schema` = None
+    /// - `reader_schema` = None
+    /// - `writer_schema_store` = None
+    /// - `active_fp` = None
+    /// - `static_store_mode` = false
     pub fn new() -> Self {
         Self::default()
     }
 
-    fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> 
Result<RecordDecoder, ArrowError> {
-        let root_field = AvroFieldBuilder::new(schema)
-            .with_utf8view(self.utf8_view)
-            .with_strict_mode(self.strict_mode)
-            .build()?;
-        RecordDecoder::try_new_with_options(root_field.data_type(), 
self.utf8_view)
+    fn make_record_decoder<'a>(
+        &self,
+        writer_schema: &AvroSchema<'a>,
+        reader_schema: Option<&AvroSchema<'a>>,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let field_builder = match reader_schema {
+            Some(rs) if !compare_schemas(writer_schema, rs)? => {
+                AvroFieldBuilder::new(writer_schema).with_reader_schema(rs)
+            }
+            Some(rs) => AvroFieldBuilder::new(rs),
+            None => AvroFieldBuilder::new(writer_schema),
+        }
+        .with_utf8view(self.utf8_view)
+        .with_strict_mode(self.strict_mode);
+        let root = field_builder.build()?;
+        RecordDecoder::try_new_with_options(root.data_type(), self.utf8_view)
     }
 
-    fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, 
Decoder), ArrowError> {
-        let header = read_header(reader)?;
-        let record_decoder = if let Some(schema) = &self.schema {
-            self.make_record_decoder(schema)?
-        } else {
-            let avro_schema: Option<AvroSchema<'_>> = header
-                .schema()
-                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
-            let avro_schema = avro_schema.ok_or_else(|| {
-                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
-            })?;
-            self.make_record_decoder(&avro_schema)?
-        };
-        let decoder = Decoder::new(record_decoder, self.batch_size);
-        Ok((header, decoder))
+    fn make_decoder_with_parts(
+        &self,
+        active_decoder: RecordDecoder,
+        active_fingerprint: Option<Fingerprint>,
+        reader_schema: Option<AvroSchema<'static>>,
+        writer_schema_store: Option<SchemaStore<'static>>,
+    ) -> Decoder {
+        Decoder {
+            batch_size: self.batch_size,
+            remaining_capacity: self.batch_size,
+            active_fingerprint,
+            active_decoder,
+            cache: IndexMap::new(),
+            max_cache_size: self.decoder_cache_size,
+            reader_schema,
+            utf8_view: self.utf8_view,
+            writer_schema_store,
+            strict_mode: self.strict_mode,
+            pending_schema: None,
+        }
+    }
+
+    fn make_decoder(&self, header: Option<&Header>) -> Result<Decoder, 
ArrowError> {
+        match header {
+            Some(hdr) => {
+                let writer_schema = hdr
+                    .schema()
+                    .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+                    .ok_or_else(|| {
+                        ArrowError::ParseError("No Avro schema present in file 
header".into())
+                    })?;
+                let record_decoder =
+                    self.make_record_decoder(&writer_schema, 
self.reader_schema.as_ref())?;
+                Ok(self.make_decoder_with_parts(record_decoder, None, None, 
None))
+            }
+            None => {
+                let reader_schema = self.reader_schema.clone().ok_or_else(|| {
+                    ArrowError::ParseError("Reader schema required for raw 
Avro".into())
+                })?;
+                let (init_fingerprint, initial_decoder) =
+                    if let (Some(schema_store), Some(fingerprint)) =
+                        (&self.writer_schema_store, self.active_fingerprint)
+                    {
+                        // An initial fingerprint is provided, use it to look 
up the first schema.
+                        let writer_schema = 
schema_store.lookup(&fingerprint).ok_or_else(|| {
+                            ArrowError::ParseError(
+                                "Active fingerprint not found in schema 
store".into(),
+                            )
+                        })?;
+                        let decoder =
+                            self.make_record_decoder(writer_schema, 
Some(&reader_schema))?;
+                        (Some(fingerprint), decoder)
+                    } else {
+                        // No initial fingerprint; the first record must 
contain one.
+                        // A decoder is created from the reader schema only.
+                        let decoder = self.make_record_decoder(&reader_schema, 
None)?;
+                        (None, decoder)
+                    };
+                Ok(self.make_decoder_with_parts(
+                    initial_decoder,
+                    init_fingerprint,

Review Comment:
   Ok this is resolved. Can confirm the `init_` is gone in my latest push. 
Sorry about making you have to catch that twice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to