scovich commented on code in PR #8006: URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2249240259
########## arrow-avro/src/reader/mod.rs: ########## @@ -216,34 +369,91 @@ impl ReaderBuilder { /// - `batch_size` = 1024 /// - `strict_mode` = false /// - `utf8_view` = false - /// - `schema` = None + /// - `reader_schema` = None + /// - `writer_schema_store` = None + /// - `active_fingerprint` = None pub fn new() -> Self { Self::default() } - fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> Result<RecordDecoder, ArrowError> { - let root_field = AvroFieldBuilder::new(schema) - .with_utf8view(self.utf8_view) - .with_strict_mode(self.strict_mode) - .build()?; - RecordDecoder::try_new_with_options(root_field.data_type(), self.utf8_view) + fn make_record_decoder<'a>( + &self, + writer_schema: &AvroSchema<'a>, + reader_schema: Option<&AvroSchema<'a>>, + ) -> Result<RecordDecoder, ArrowError> { + let root = match reader_schema { + Some(reader_schema) if !compare_schemas(writer_schema, reader_schema)? => { + AvroFieldBuilder::new(writer_schema).with_reader_schema(reader_schema) + } + _ => AvroFieldBuilder::new(writer_schema), + } + .with_utf8view(self.utf8_view) + .with_strict_mode(self.strict_mode) + .build()?; + RecordDecoder::try_new_with_options(root.data_type(), self.utf8_view) + } + + fn make_decoder_with_parts( + &self, + active_decoder: RecordDecoder, + active_fingerprint: Option<Fingerprint>, + reader_schema: Option<AvroSchema<'static>>, + writer_schema_store: Option<SchemaStore<'static>>, + ) -> Decoder { + #[cfg(feature = "lru")] + let capacity = NonZeroUsize::new(self.decoder_cache_size).unwrap_or(NonZeroUsize::MIN); // NonZeroUsize::MIN is 1 + Decoder { + batch_size: self.batch_size, + remaining_capacity: self.batch_size, + active_fingerprint, + active_decoder, + #[cfg(feature = "lru")] + cache: LruCache::new(capacity), + #[cfg(not(feature = "lru"))] + cache: IndexMap::new(), + max_cache_size: self.decoder_cache_size, + reader_schema, + utf8_view: self.utf8_view, + writer_schema_store, + strict_mode: self.strict_mode, + pending_schema: None, + } } - fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, Decoder), ArrowError> { - let header = read_header(reader)?; - let record_decoder = if let Some(schema) = &self.schema { - self.make_record_decoder(schema)? - } else { - let avro_schema: Option<AvroSchema<'_>> = header + fn make_decoder(&self, header: Option<&Header>) -> Result<Decoder, ArrowError> { + if let Some(hdr) = header { + let writer_schema = hdr .schema() - .map_err(|e| ArrowError::ExternalError(Box::new(e)))?; - let avro_schema = avro_schema.ok_or_else(|| { - ArrowError::ParseError("No Avro schema present in file header".to_string()) + .map_err(|e| ArrowError::ExternalError(Box::new(e)))? + .ok_or_else(|| { + ArrowError::ParseError("No Avro schema present in file header".into()) + })?; + let record_decoder = + self.make_record_decoder(&writer_schema, self.reader_schema.as_ref())?; + return Ok(self.make_decoder_with_parts(record_decoder, None, None, None)); + } + let writer_schema_store = self.writer_schema_store.as_ref().ok_or_else(|| { + ArrowError::ParseError("Writer schema store required for raw Avro".into()) + })?; + let fingerprint = self + .active_fingerprint + .or_else(|| writer_schema_store.fingerprints().into_iter().next()) + .ok_or_else(|| { + ArrowError::ParseError( + "Writer schema store must contain at least one schema".into(), + ) })?; - self.make_record_decoder(&avro_schema)? - }; - let decoder = Decoder::new(record_decoder, self.batch_size); - Ok((header, decoder)) + let writer_schema = writer_schema_store.lookup(&fingerprint).ok_or_else(|| { + ArrowError::ParseError("Active fingerprint not found in schema store".into()) + })?; + let record_decoder = + self.make_record_decoder(writer_schema, self.reader_schema.as_ref())?; + Ok(self.make_decoder_with_parts( + record_decoder, + Some(fingerprint), + self.reader_schema.clone(), + self.writer_schema_store.clone(), Review Comment: This is a weird code smell. Some of the state comes from `self` internally, while other state has to be copied manually by caller even tho it's _also_ coming from `self`? We know that the `writer_schema_store` should only be set for this raw decoder path (see other comment), so it shouldn't be a member of `self` at all and instead passed directly as an argument to `build_decoder`. That takes care of the one. But what about the reader schema? What's the story behind the smell there? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org