jecsand838 commented on code in PR #8006: URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2249114536
########## arrow-avro/src/reader/mod.rs: ########## @@ -216,34 +330,98 @@ impl ReaderBuilder { /// - `batch_size` = 1024 /// - `strict_mode` = false /// - `utf8_view` = false - /// - `schema` = None + /// - `reader_schema` = None + /// - `writer_schema_store` = None + /// - `active_fp` = None + /// - `static_store_mode` = false pub fn new() -> Self { Self::default() } - fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> Result<RecordDecoder, ArrowError> { - let root_field = AvroFieldBuilder::new(schema) - .with_utf8view(self.utf8_view) - .with_strict_mode(self.strict_mode) - .build()?; - RecordDecoder::try_new_with_options(root_field.data_type(), self.utf8_view) + fn make_record_decoder<'a>( + &self, + writer_schema: &AvroSchema<'a>, + reader_schema: Option<&AvroSchema<'a>>, + ) -> Result<RecordDecoder, ArrowError> { + let field_builder = match reader_schema { + Some(rs) if !compare_schemas(writer_schema, rs)? => { + AvroFieldBuilder::new(writer_schema).with_reader_schema(rs) + } + Some(rs) => AvroFieldBuilder::new(rs), + None => AvroFieldBuilder::new(writer_schema), + } + .with_utf8view(self.utf8_view) + .with_strict_mode(self.strict_mode); + let root = field_builder.build()?; + RecordDecoder::try_new_with_options(root.data_type(), self.utf8_view) } - fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, Decoder), ArrowError> { - let header = read_header(reader)?; - let record_decoder = if let Some(schema) = &self.schema { - self.make_record_decoder(schema)? - } else { - let avro_schema: Option<AvroSchema<'_>> = header - .schema() - .map_err(|e| ArrowError::ExternalError(Box::new(e)))?; - let avro_schema = avro_schema.ok_or_else(|| { - ArrowError::ParseError("No Avro schema present in file header".to_string()) - })?; - self.make_record_decoder(&avro_schema)? - }; - let decoder = Decoder::new(record_decoder, self.batch_size); - Ok((header, decoder)) + fn make_decoder_with_parts( + &self, + active_decoder: RecordDecoder, + active_fingerprint: Option<Fingerprint>, + reader_schema: Option<AvroSchema<'static>>, + writer_schema_store: Option<SchemaStore<'static>>, + ) -> Decoder { + Decoder { + batch_size: self.batch_size, + remaining_capacity: self.batch_size, + active_fingerprint, + active_decoder, + cache: IndexMap::new(), + max_cache_size: self.decoder_cache_size, + reader_schema, + utf8_view: self.utf8_view, + writer_schema_store, + strict_mode: self.strict_mode, + pending_schema: None, + } + } + + fn make_decoder(&self, header: Option<&Header>) -> Result<Decoder, ArrowError> { + match header { + Some(hdr) => { + let writer_schema = hdr + .schema() + .map_err(|e| ArrowError::ExternalError(Box::new(e)))? + .ok_or_else(|| { + ArrowError::ParseError("No Avro schema present in file header".into()) + })?; + let record_decoder = + self.make_record_decoder(&writer_schema, self.reader_schema.as_ref())?; + Ok(self.make_decoder_with_parts(record_decoder, None, None, None)) + } + None => { + let reader_schema = self.reader_schema.clone().ok_or_else(|| { + ArrowError::ParseError("Reader schema required for raw Avro".into()) + })?; + let (init_fingerprint, initial_decoder) = + if let (Some(schema_store), Some(fingerprint)) = + (&self.writer_schema_store, self.active_fingerprint) + { + // An initial fingerprint is provided, use it to look up the first schema. + let writer_schema = schema_store.lookup(&fingerprint).ok_or_else(|| { + ArrowError::ParseError( + "Active fingerprint not found in schema store".into(), + ) + })?; + let decoder = + self.make_record_decoder(writer_schema, Some(&reader_schema))?; + (Some(fingerprint), decoder) + } else { + // No initial fingerprint; the first record must contain one. + // A decoder is created from the reader schema only. + let decoder = self.make_record_decoder(&reader_schema, None)?; + (None, decoder) + }; + Ok(self.make_decoder_with_parts( + initial_decoder, + init_fingerprint, Review Comment: Ok this is resolved. Can confirm the `init_` is gone in my latest push. Sorry about making you have to catch that twice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org