jecsand838 commented on code in PR #8006:
URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2249114536
##########
arrow-avro/src/reader/mod.rs:
##########
@@ -216,34 +330,98 @@ impl ReaderBuilder {
/// - `batch_size` = 1024
/// - `strict_mode` = false
/// - `utf8_view` = false
- /// - `schema` = None
+ /// - `reader_schema` = None
+ /// - `writer_schema_store` = None
+ /// - `active_fp` = None
+ /// - `static_store_mode` = false
pub fn new() -> Self {
Self::default()
}
- fn make_record_decoder(&self, schema: &AvroSchema<'_>) ->
Result<RecordDecoder, ArrowError> {
- let root_field = AvroFieldBuilder::new(schema)
- .with_utf8view(self.utf8_view)
- .with_strict_mode(self.strict_mode)
- .build()?;
- RecordDecoder::try_new_with_options(root_field.data_type(),
self.utf8_view)
+ fn make_record_decoder<'a>(
+ &self,
+ writer_schema: &AvroSchema<'a>,
+ reader_schema: Option<&AvroSchema<'a>>,
+ ) -> Result<RecordDecoder, ArrowError> {
+ let field_builder = match reader_schema {
+ Some(rs) if !compare_schemas(writer_schema, rs)? => {
+ AvroFieldBuilder::new(writer_schema).with_reader_schema(rs)
+ }
+ Some(rs) => AvroFieldBuilder::new(rs),
+ None => AvroFieldBuilder::new(writer_schema),
+ }
+ .with_utf8view(self.utf8_view)
+ .with_strict_mode(self.strict_mode);
+ let root = field_builder.build()?;
+ RecordDecoder::try_new_with_options(root.data_type(), self.utf8_view)
}
- fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header,
Decoder), ArrowError> {
- let header = read_header(reader)?;
- let record_decoder = if let Some(schema) = &self.schema {
- self.make_record_decoder(schema)?
- } else {
- let avro_schema: Option<AvroSchema<'_>> = header
- .schema()
- .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
- let avro_schema = avro_schema.ok_or_else(|| {
- ArrowError::ParseError("No Avro schema present in file
header".to_string())
- })?;
- self.make_record_decoder(&avro_schema)?
- };
- let decoder = Decoder::new(record_decoder, self.batch_size);
- Ok((header, decoder))
+ fn make_decoder_with_parts(
+ &self,
+ active_decoder: RecordDecoder,
+ active_fingerprint: Option<Fingerprint>,
+ reader_schema: Option<AvroSchema<'static>>,
+ writer_schema_store: Option<SchemaStore<'static>>,
+ ) -> Decoder {
+ Decoder {
+ batch_size: self.batch_size,
+ remaining_capacity: self.batch_size,
+ active_fingerprint,
+ active_decoder,
+ cache: IndexMap::new(),
+ max_cache_size: self.decoder_cache_size,
+ reader_schema,
+ utf8_view: self.utf8_view,
+ writer_schema_store,
+ strict_mode: self.strict_mode,
+ pending_schema: None,
+ }
+ }
+
+ fn make_decoder(&self, header: Option<&Header>) -> Result<Decoder,
ArrowError> {
+ match header {
+ Some(hdr) => {
+ let writer_schema = hdr
+ .schema()
+ .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+ .ok_or_else(|| {
+ ArrowError::ParseError("No Avro schema present in file
header".into())
+ })?;
+ let record_decoder =
+ self.make_record_decoder(&writer_schema,
self.reader_schema.as_ref())?;
+ Ok(self.make_decoder_with_parts(record_decoder, None, None,
None))
+ }
+ None => {
+ let reader_schema = self.reader_schema.clone().ok_or_else(|| {
+ ArrowError::ParseError("Reader schema required for raw
Avro".into())
+ })?;
+ let (init_fingerprint, initial_decoder) =
+ if let (Some(schema_store), Some(fingerprint)) =
+ (&self.writer_schema_store, self.active_fingerprint)
+ {
+ // An initial fingerprint is provided, use it to look
up the first schema.
+ let writer_schema =
schema_store.lookup(&fingerprint).ok_or_else(|| {
+ ArrowError::ParseError(
+ "Active fingerprint not found in schema
store".into(),
+ )
+ })?;
+ let decoder =
+ self.make_record_decoder(writer_schema,
Some(&reader_schema))?;
+ (Some(fingerprint), decoder)
+ } else {
+ // No initial fingerprint; the first record must
contain one.
+ // A decoder is created from the reader schema only.
+ let decoder = self.make_record_decoder(&reader_schema,
None)?;
+ (None, decoder)
+ };
+ Ok(self.make_decoder_with_parts(
+ initial_decoder,
+ init_fingerprint,
Review Comment:
Ok this is resolved. Can confirm the `init_` is gone in my latest push.
Sorry about making you have to catch that twice.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]