scovich commented on code in PR #8006:
URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2249240259


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -216,34 +369,91 @@ impl ReaderBuilder {
     /// - `batch_size` = 1024
     /// - `strict_mode` = false
     /// - `utf8_view` = false
-    /// - `schema` = None
+    /// - `reader_schema` = None
+    /// - `writer_schema_store` = None
+    /// - `active_fingerprint` = None
     pub fn new() -> Self {
         Self::default()
     }
 
-    fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> 
Result<RecordDecoder, ArrowError> {
-        let root_field = AvroFieldBuilder::new(schema)
-            .with_utf8view(self.utf8_view)
-            .with_strict_mode(self.strict_mode)
-            .build()?;
-        RecordDecoder::try_new_with_options(root_field.data_type(), 
self.utf8_view)
+    fn make_record_decoder<'a>(
+        &self,
+        writer_schema: &AvroSchema<'a>,
+        reader_schema: Option<&AvroSchema<'a>>,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let root = match reader_schema {
+            Some(reader_schema) if !compare_schemas(writer_schema, 
reader_schema)? => {
+                
AvroFieldBuilder::new(writer_schema).with_reader_schema(reader_schema)
+            }
+            _ => AvroFieldBuilder::new(writer_schema),
+        }
+        .with_utf8view(self.utf8_view)
+        .with_strict_mode(self.strict_mode)
+        .build()?;
+        RecordDecoder::try_new_with_options(root.data_type(), self.utf8_view)
+    }
+
+    fn make_decoder_with_parts(
+        &self,
+        active_decoder: RecordDecoder,
+        active_fingerprint: Option<Fingerprint>,
+        reader_schema: Option<AvroSchema<'static>>,
+        writer_schema_store: Option<SchemaStore<'static>>,
+    ) -> Decoder {
+        #[cfg(feature = "lru")]
+        let capacity = 
NonZeroUsize::new(self.decoder_cache_size).unwrap_or(NonZeroUsize::MIN); // 
NonZeroUsize::MIN is 1
+        Decoder {
+            batch_size: self.batch_size,
+            remaining_capacity: self.batch_size,
+            active_fingerprint,
+            active_decoder,
+            #[cfg(feature = "lru")]
+            cache: LruCache::new(capacity),
+            #[cfg(not(feature = "lru"))]
+            cache: IndexMap::new(),
+            max_cache_size: self.decoder_cache_size,
+            reader_schema,
+            utf8_view: self.utf8_view,
+            writer_schema_store,
+            strict_mode: self.strict_mode,
+            pending_schema: None,
+        }
     }
 
-    fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, 
Decoder), ArrowError> {
-        let header = read_header(reader)?;
-        let record_decoder = if let Some(schema) = &self.schema {
-            self.make_record_decoder(schema)?
-        } else {
-            let avro_schema: Option<AvroSchema<'_>> = header
+    fn make_decoder(&self, header: Option<&Header>) -> Result<Decoder, 
ArrowError> {
+        if let Some(hdr) = header {
+            let writer_schema = hdr
                 .schema()
-                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
-            let avro_schema = avro_schema.ok_or_else(|| {
-                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+                .ok_or_else(|| {
+                    ArrowError::ParseError("No Avro schema present in file 
header".into())
+                })?;
+            let record_decoder =
+                self.make_record_decoder(&writer_schema, 
self.reader_schema.as_ref())?;
+            return Ok(self.make_decoder_with_parts(record_decoder, None, None, 
None));
+        }
+        let writer_schema_store = 
self.writer_schema_store.as_ref().ok_or_else(|| {
+            ArrowError::ParseError("Writer schema store required for raw 
Avro".into())
+        })?;
+        let fingerprint = self
+            .active_fingerprint
+            .or_else(|| writer_schema_store.fingerprints().into_iter().next())
+            .ok_or_else(|| {
+                ArrowError::ParseError(
+                    "Writer schema store must contain at least one 
schema".into(),
+                )
             })?;
-            self.make_record_decoder(&avro_schema)?
-        };
-        let decoder = Decoder::new(record_decoder, self.batch_size);
-        Ok((header, decoder))
+        let writer_schema = 
writer_schema_store.lookup(&fingerprint).ok_or_else(|| {
+            ArrowError::ParseError("Active fingerprint not found in schema 
store".into())
+        })?;
+        let record_decoder =
+            self.make_record_decoder(writer_schema, 
self.reader_schema.as_ref())?;
+        Ok(self.make_decoder_with_parts(
+            record_decoder,
+            Some(fingerprint),
+            self.reader_schema.clone(),
+            self.writer_schema_store.clone(),

Review Comment:
   This is a weird code smell. Some of the state comes from `self` internally, 
while other state has to be copied manually by caller even tho it's _also_ 
coming from `self`?
   
   We know that the `writer_schema_store` should only be set for this raw 
decoder path (see other comment), so it shouldn't be a member of `self` at all 
and instead passed directly as an argument to `build_decoder`. That takes care 
of the one. 
   
   But what about the reader schema? What's the story behind the smell there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to