scovich commented on code in PR #8006: URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2243631735
########## arrow-avro/src/reader/mod.rs: ########## @@ -216,34 +330,98 @@ impl ReaderBuilder { /// - `batch_size` = 1024 /// - `strict_mode` = false /// - `utf8_view` = false - /// - `schema` = None + /// - `reader_schema` = None + /// - `writer_schema_store` = None + /// - `active_fp` = None + /// - `static_store_mode` = false pub fn new() -> Self { Self::default() } - fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> Result<RecordDecoder, ArrowError> { - let root_field = AvroFieldBuilder::new(schema) - .with_utf8view(self.utf8_view) - .with_strict_mode(self.strict_mode) - .build()?; - RecordDecoder::try_new_with_options(root_field.data_type(), self.utf8_view) + fn make_record_decoder<'a>( + &self, + writer_schema: &AvroSchema<'a>, + reader_schema: Option<&AvroSchema<'a>>, + ) -> Result<RecordDecoder, ArrowError> { + let field_builder = match reader_schema { + Some(rs) if !compare_schemas(writer_schema, rs)? => { + AvroFieldBuilder::new(writer_schema).with_reader_schema(rs) + } + Some(rs) => AvroFieldBuilder::new(rs), + None => AvroFieldBuilder::new(writer_schema), + } + .with_utf8view(self.utf8_view) + .with_strict_mode(self.strict_mode); + let root = field_builder.build()?; + RecordDecoder::try_new_with_options(root.data_type(), self.utf8_view) } - fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, Decoder), ArrowError> { - let header = read_header(reader)?; - let record_decoder = if let Some(schema) = &self.schema { - self.make_record_decoder(schema)? - } else { - let avro_schema: Option<AvroSchema<'_>> = header - .schema() - .map_err(|e| ArrowError::ExternalError(Box::new(e)))?; - let avro_schema = avro_schema.ok_or_else(|| { - ArrowError::ParseError("No Avro schema present in file header".to_string()) - })?; - self.make_record_decoder(&avro_schema)? - }; - let decoder = Decoder::new(record_decoder, self.batch_size); - Ok((header, decoder)) + fn make_decoder_with_parts( + &self, + active_decoder: RecordDecoder, + active_fingerprint: Option<Fingerprint>, + reader_schema: Option<AvroSchema<'static>>, + writer_schema_store: Option<SchemaStore<'static>>, + ) -> Decoder { + Decoder { + batch_size: self.batch_size, + remaining_capacity: self.batch_size, + active_fingerprint, + active_decoder, + cache: IndexMap::new(), + max_cache_size: self.decoder_cache_size, + reader_schema, + utf8_view: self.utf8_view, + writer_schema_store, + strict_mode: self.strict_mode, + pending_schema: None, + } + } + + fn make_decoder(&self, header: Option<&Header>) -> Result<Decoder, ArrowError> { + match header { + Some(hdr) => { + let writer_schema = hdr + .schema() + .map_err(|e| ArrowError::ExternalError(Box::new(e)))? + .ok_or_else(|| { + ArrowError::ParseError("No Avro schema present in file header".into()) + })?; + let record_decoder = + self.make_record_decoder(&writer_schema, self.reader_schema.as_ref())?; + Ok(self.make_decoder_with_parts(record_decoder, None, None, None)) + } + None => { + let reader_schema = self.reader_schema.clone().ok_or_else(|| { + ArrowError::ParseError("Reader schema required for raw Avro".into()) + })?; + let (init_fingerprint, initial_decoder) = + if let (Some(schema_store), Some(fingerprint)) = + (&self.writer_schema_store, self.active_fingerprint) + { + // An initial fingerprint is provided, use it to look up the first schema. + let writer_schema = schema_store.lookup(&fingerprint).ok_or_else(|| { + ArrowError::ParseError( + "Active fingerprint not found in schema store".into(), + ) + })?; + let decoder = + self.make_record_decoder(writer_schema, Some(&reader_schema))?; + (Some(fingerprint), decoder) + } else { + // No initial fingerprint; the first record must contain one. + // A decoder is created from the reader schema only. + let decoder = self.make_record_decoder(&reader_schema, None)?; + (None, decoder) + }; + Ok(self.make_decoder_with_parts( + initial_decoder, + init_fingerprint, Review Comment: Seems better to pick one or the other of `init_` vs. `initial_`? (slight preference toward the latter) ########## arrow-avro/src/reader/mod.rs: ########## @@ -520,20 +498,13 @@ impl ReaderBuilder { self.writer_schema_store.as_ref(), self.reader_schema.as_ref(), self.active_fingerprint.as_ref(), - self.static_store_mode, ) { - (Some(_), None, _, _) => Err(ArrowError::ParseError( + (Some(_), None, _) => Err(ArrowError::ParseError( "Reader schema must be set when writer schema store is provided".into(), )), - (None, _, Some(_), _) => Err(ArrowError::ParseError( + (None, _, Some(_)) => Err(ArrowError::ParseError( "Active fingerprint requires a writer schema store".into(), )), - (None, _, _, true) => Err(ArrowError::ParseError( - "static_store_mode=true requires a writer schema store".into(), - )), - (Some(_), _, None, true) => Err(ArrowError::ParseError( - "static_store_mode=true requires an active fingerprint".into(), - )), _ => Ok(()), Review Comment: Instead of defaulting to `Ok`, it seems better to enumerate the valid cases and default (if necessary) to a generic `Err`? On a related note -- have we done the full "truth table" for these values, to determine which combos are definitely valid vs. definitely invalid? Otherwise I worry we might overlook some invalid or ambiguous cases. ########## arrow-avro/src/schema.rs: ########## @@ -260,13 +274,369 @@ pub struct Fixed<'a> { pub attributes: Attributes<'a>, } +/// Supported fingerprint algorithms for Avro schema identification. +/// Currently only `Rabin` is supported, `SHA256` and `MD5` support will come in a future update +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum FingerprintAlgorithm { + /// 64‑bit CRC‑64‑AVRO Rabin fingerprint. + Rabin, +} + +/// A schema fingerprint in one of the supported formats. +/// +/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore` +/// instance always stores only one variant, matching its configured +/// `FingerprintAlgorithm`, but the enum makes the API uniform. +/// Currently only `Rabin` is supported +/// +/// <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints> +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum Fingerprint { + /// A 64-bit Rabin fingerprint. + Rabin(u64), +} + +/// Allow easy extraction of the algorithm used to create a fingerprint. +impl From<&Fingerprint> for FingerprintAlgorithm { + #[inline] + fn from(fp: &Fingerprint) -> Self { + match fp { + Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin, + } + } +} + +/// Generates a fingerprint for the given `Schema` using the specified `FingerprintAlgorithm`. +#[inline] +pub(crate) fn generate_fingerprint( + schema: &Schema, + hash_type: FingerprintAlgorithm, +) -> Result<Fingerprint, ArrowError> { + let canonical = generate_canonical_form(schema).map_err(|e| { + ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}")) + })?; + match hash_type { + FingerprintAlgorithm::Rabin => { + Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical))) + } + } +} + +/// Generates the 64-bit Rabin fingerprint for the given `Schema`. +/// +/// The fingerprint is computed from the canonical form of the schema. +/// This is also known as `CRC-64-AVRO`. +/// +/// # Returns +/// A `Fingerprint::Rabin` variant containing the 64-bit fingerprint. +#[inline] +pub fn generate_fingerprint_rabin(schema: &Schema) -> Result<Fingerprint, ArrowError> { + generate_fingerprint(schema, FingerprintAlgorithm::Rabin) +} + +/// Generates the Parsed Canonical Form for the given [`Schema`]. +/// +/// The canonical form is a standardized JSON representation of the schema, +/// primarily used for generating a schema fingerprint for equality checking. +/// +/// This form strips attributes that do not affect the schema's identity, +/// such as `doc` fields, `aliases`, and any properties not defined in the +/// Avro specification. +/// +/// <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas> +#[inline] +pub fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> { + build_canonical(schema, None) +} + +/// An in-memory cache of Avro schemas, indexed by their fingerprint. +/// +/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas efficiently. +/// Each schema is associated with a unique [`Fingerprint`], which is generated based +/// on the schema's canonical form and a specific hashing algorithm. +/// +/// A `SchemaStore` instance is configured to use a single [`FingerprintAlgorithm`] such as Rabin, +/// MD5 (not yet supported), or SHA256 (not yet supported) for all its operations. +/// This ensures consistency when generating fingerprints and looking up schemas. +/// All schemas registered will have their fingerprint computed with this algorithm, and +/// lookups must use a matching fingerprint. +/// +/// The lifetime parameter `'a` corresponds to the lifetime of the string slices +/// contained within the stored [`Schema`] objects. This means the `SchemaStore` +/// cannot outlive the data referenced by the schemas it contains. +/// +/// # Examples +/// +/// ```no_run +/// // Create a new store with the default Rabin fingerprinting. +/// use arrow_avro::schema::{PrimitiveType, Schema, SchemaStore, TypeName}; +/// +/// let mut store = SchemaStore::new(); +/// let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String)); +/// // Register the schema to get its fingerprint. +/// let fingerprint = store.register(schema.clone()).unwrap(); +/// // Use the fingerprint to look up the schema. +/// let retrieved_schema = store.lookup(&fingerprint).cloned(); +/// assert_eq!(retrieved_schema, Some(schema)); +/// ``` +#[derive(Debug, Clone)] +pub struct SchemaStore<'a> { + /// The hashing algorithm used for generating fingerprints. + fingerprint_algorithm: FingerprintAlgorithm, + /// A map from a schema's fingerprint to the schema itself. + schemas: HashMap<Fingerprint, Schema<'a>>, +} + +impl<'a> TryFrom<&'a [Schema<'a>]> for SchemaStore<'a> { + type Error = ArrowError; + + /// Creates a `SchemaStore` from a slice of schemas. + /// Each schema in the slice is registered with the new store. + fn try_from(schemas: &'a [Schema<'a>]) -> Result<Self, Self::Error> { + let mut store = SchemaStore::new(); + for schema in schemas { + store.register(schema.clone())?; + } + Ok(store) + } +} + +impl<'a> Default for SchemaStore<'a> { + fn default() -> Self { + Self { + fingerprint_algorithm: FingerprintAlgorithm::Rabin, + schemas: HashMap::new(), + } + } +} + +impl<'a> SchemaStore<'a> { + /// Creates an empty `SchemaStore` using the default fingerprinting algorithm (64-bit Rabin). + pub fn new() -> Self { + Self::default() + } + + /// Registers a schema with the store and returns its fingerprint. + /// + /// A fingerprint is calculated for the given schema using the store's configured + /// hash type. If a schema with the same fingerprint does not already exist in the + /// store, the new schema is inserted. If the fingerprint already exists, the + /// existing schema is not overwritten. + /// + /// # Arguments + /// + /// * `schema` - The schema to register. + /// + /// # Returns + /// + /// A `Result` containing the `Fingerprint` of the schema if successful, + /// or an `ArrowError` on failure. + pub fn register(&mut self, schema: Schema<'a>) -> Result<Fingerprint, ArrowError> { + let fp = generate_fingerprint(&schema, self.fingerprint_algorithm)?; + match self.schemas.entry(fp) { + Entry::Occupied(entry) => { + if entry.get() != &schema { + return Err(ArrowError::ComputeError(format!( + "Schema fingerprint collision detected for fingerprint {fp:?}" + ))); + } + } + Entry::Vacant(entry) => { + entry.insert(schema); + } + } + Ok(fp) + } + + /// Looks up a schema by its `Fingerprint`. + /// + /// # Arguments + /// + /// * `fp` - A reference to the `Fingerprint` of the schema to look up. + /// + /// # Returns + /// + /// An `Option` containing a clone of the `Schema` if found, otherwise `None`. + pub fn lookup(&self, fp: &Fingerprint) -> Option<&Schema<'a>> { + self.schemas.get(fp) + } + + /// Returns the `FingerprintAlgorithm` used by the `SchemaStore` for fingerprinting. + pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm { + self.fingerprint_algorithm + } +} + +#[inline] +fn quote(s: &str) -> Result<String, ArrowError> { + serde_json::to_string(s) + .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: {e}"))) +} + +fn make_fullname(name: &str, namespace_attr: Option<&str>, enclosing_ns: Option<&str>) -> String { + match namespace_attr.or(enclosing_ns) { + Some(ns) if !name.contains('.') => format!("{ns}.{name}"), + _ => name.to_string(), + } +} + +fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> Result<String, ArrowError> { + #[inline] + fn prim_str(pt: &PrimitiveType) -> &'static str { + match pt { + PrimitiveType::Null => "null", + PrimitiveType::Boolean => "boolean", + PrimitiveType::Int => "int", + PrimitiveType::Long => "long", + PrimitiveType::Float => "float", + PrimitiveType::Double => "double", + PrimitiveType::Bytes => "bytes", + PrimitiveType::String => "string", + } + } + Ok(match schema { + Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match tn { Review Comment: Do we _really_ need `r#type` raw token syntax? Can we not use attributes on `Type::type` to give it a valid rust name while serde and strum still emit the correct json for it? ########## arrow-avro/src/schema.rs: ########## @@ -260,13 +274,369 @@ pub struct Fixed<'a> { pub attributes: Attributes<'a>, } +/// Supported fingerprint algorithms for Avro schema identification. +/// Currently only `Rabin` is supported, `SHA256` and `MD5` support will come in a future update +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum FingerprintAlgorithm { + /// 64‑bit CRC‑64‑AVRO Rabin fingerprint. + Rabin, +} + +/// A schema fingerprint in one of the supported formats. +/// +/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore` +/// instance always stores only one variant, matching its configured +/// `FingerprintAlgorithm`, but the enum makes the API uniform. +/// Currently only `Rabin` is supported +/// +/// <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints> +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum Fingerprint { + /// A 64-bit Rabin fingerprint. + Rabin(u64), +} + +/// Allow easy extraction of the algorithm used to create a fingerprint. +impl From<&Fingerprint> for FingerprintAlgorithm { + #[inline] + fn from(fp: &Fingerprint) -> Self { + match fp { + Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin, + } + } +} + +/// Generates a fingerprint for the given `Schema` using the specified `FingerprintAlgorithm`. +#[inline] +pub(crate) fn generate_fingerprint( + schema: &Schema, + hash_type: FingerprintAlgorithm, +) -> Result<Fingerprint, ArrowError> { + let canonical = generate_canonical_form(schema).map_err(|e| { + ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}")) + })?; + match hash_type { + FingerprintAlgorithm::Rabin => { + Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical))) + } + } +} + +/// Generates the 64-bit Rabin fingerprint for the given `Schema`. +/// +/// The fingerprint is computed from the canonical form of the schema. +/// This is also known as `CRC-64-AVRO`. +/// +/// # Returns +/// A `Fingerprint::Rabin` variant containing the 64-bit fingerprint. +#[inline] +pub fn generate_fingerprint_rabin(schema: &Schema) -> Result<Fingerprint, ArrowError> { + generate_fingerprint(schema, FingerprintAlgorithm::Rabin) +} + +/// Generates the Parsed Canonical Form for the given [`Schema`]. +/// +/// The canonical form is a standardized JSON representation of the schema, +/// primarily used for generating a schema fingerprint for equality checking. +/// +/// This form strips attributes that do not affect the schema's identity, +/// such as `doc` fields, `aliases`, and any properties not defined in the +/// Avro specification. +/// +/// <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas> +#[inline] +pub fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> { + build_canonical(schema, None) +} + +/// An in-memory cache of Avro schemas, indexed by their fingerprint. +/// +/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas efficiently. +/// Each schema is associated with a unique [`Fingerprint`], which is generated based +/// on the schema's canonical form and a specific hashing algorithm. +/// +/// A `SchemaStore` instance is configured to use a single [`FingerprintAlgorithm`] such as Rabin, +/// MD5 (not yet supported), or SHA256 (not yet supported) for all its operations. +/// This ensures consistency when generating fingerprints and looking up schemas. +/// All schemas registered will have their fingerprint computed with this algorithm, and +/// lookups must use a matching fingerprint. +/// +/// The lifetime parameter `'a` corresponds to the lifetime of the string slices +/// contained within the stored [`Schema`] objects. This means the `SchemaStore` +/// cannot outlive the data referenced by the schemas it contains. +/// +/// # Examples +/// +/// ```no_run +/// // Create a new store with the default Rabin fingerprinting. +/// use arrow_avro::schema::{PrimitiveType, Schema, SchemaStore, TypeName}; +/// +/// let mut store = SchemaStore::new(); +/// let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String)); +/// // Register the schema to get its fingerprint. +/// let fingerprint = store.register(schema.clone()).unwrap(); +/// // Use the fingerprint to look up the schema. +/// let retrieved_schema = store.lookup(&fingerprint).cloned(); +/// assert_eq!(retrieved_schema, Some(schema)); +/// ``` +#[derive(Debug, Clone)] +pub struct SchemaStore<'a> { + /// The hashing algorithm used for generating fingerprints. + fingerprint_algorithm: FingerprintAlgorithm, + /// A map from a schema's fingerprint to the schema itself. + schemas: HashMap<Fingerprint, Schema<'a>>, +} + +impl<'a> TryFrom<&'a [Schema<'a>]> for SchemaStore<'a> { + type Error = ArrowError; + + /// Creates a `SchemaStore` from a slice of schemas. + /// Each schema in the slice is registered with the new store. + fn try_from(schemas: &'a [Schema<'a>]) -> Result<Self, Self::Error> { + let mut store = SchemaStore::new(); + for schema in schemas { + store.register(schema.clone())?; + } + Ok(store) + } +} + +impl<'a> Default for SchemaStore<'a> { + fn default() -> Self { + Self { + fingerprint_algorithm: FingerprintAlgorithm::Rabin, + schemas: HashMap::new(), + } + } +} + +impl<'a> SchemaStore<'a> { + /// Creates an empty `SchemaStore` using the default fingerprinting algorithm (64-bit Rabin). + pub fn new() -> Self { + Self::default() + } + + /// Registers a schema with the store and returns its fingerprint. + /// + /// A fingerprint is calculated for the given schema using the store's configured + /// hash type. If a schema with the same fingerprint does not already exist in the + /// store, the new schema is inserted. If the fingerprint already exists, the + /// existing schema is not overwritten. + /// + /// # Arguments + /// + /// * `schema` - The schema to register. + /// + /// # Returns + /// + /// A `Result` containing the `Fingerprint` of the schema if successful, + /// or an `ArrowError` on failure. + pub fn register(&mut self, schema: Schema<'a>) -> Result<Fingerprint, ArrowError> { + let fp = generate_fingerprint(&schema, self.fingerprint_algorithm)?; + match self.schemas.entry(fp) { + Entry::Occupied(entry) => { + if entry.get() != &schema { + return Err(ArrowError::ComputeError(format!( + "Schema fingerprint collision detected for fingerprint {fp:?}" + ))); + } + } + Entry::Vacant(entry) => { + entry.insert(schema); + } + } + Ok(fp) + } + + /// Looks up a schema by its `Fingerprint`. + /// + /// # Arguments + /// + /// * `fp` - A reference to the `Fingerprint` of the schema to look up. + /// + /// # Returns + /// + /// An `Option` containing a clone of the `Schema` if found, otherwise `None`. + pub fn lookup(&self, fp: &Fingerprint) -> Option<&Schema<'a>> { + self.schemas.get(fp) + } + + /// Returns the `FingerprintAlgorithm` used by the `SchemaStore` for fingerprinting. + pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm { + self.fingerprint_algorithm + } +} + +#[inline] +fn quote(s: &str) -> Result<String, ArrowError> { + serde_json::to_string(s) + .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: {e}"))) +} + +fn make_fullname(name: &str, namespace_attr: Option<&str>, enclosing_ns: Option<&str>) -> String { + match namespace_attr.or(enclosing_ns) { + Some(ns) if !name.contains('.') => format!("{ns}.{name}"), + _ => name.to_string(), Review Comment: What are the rules for formatting complex field names in avro? For example, is ``a.`b.c`.d`` (a three-deep path) allowed? What about `a."hi".b`? etc. Asking because the default match arm seems a bit questionable, because it implicitly covers `Some(ns) if name.contains('.')` but then ignores `ns`? ########## arrow-avro/src/reader/mod.rs: ########## @@ -154,39 +167,134 @@ impl Decoder { /// /// Returns the number of bytes consumed. pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> { + if self.active_fingerprint.is_none() + && self.writer_schema_store.is_some() + && !data.starts_with(&SINGLE_OBJECT_MAGIC) + { + return Err(ArrowError::ParseError( + "Expected single‑object encoding fingerprint prefix for first message \ + (writer_schema_store is set but active_fingerprint is None)" + .into(), + )); + } let mut total_consumed = 0usize; - while total_consumed < data.len() && self.decoded_rows < self.batch_size { - let consumed = self.record_decoder.decode(&data[total_consumed..], 1)?; - // A successful call to record_decoder.decode means one row was decoded. - // If `consumed` is 0 on a non-empty buffer, it implies a valid zero-byte record. - // We increment `decoded_rows` to mark progress and avoid an infinite loop. - // We add `consumed` (which can be 0) to `total_consumed`. - total_consumed += consumed; - self.decoded_rows += 1; + let hash_type = self.writer_schema_store.as_ref().map_or( + FingerprintAlgorithm::Rabin, + SchemaStore::fingerprint_algorithm, + ); + while total_consumed < data.len() && self.remaining_capacity > 0 { + if let Some(prefix_bytes) = self.handle_prefix(&data[total_consumed..], hash_type)? { + // A batch is complete when its `remaining_capacity` is 0. It may be completed early if + // a schema change is detected or there are insufficient bytes to read the next prefix. + // A schema change requires a new batch. + total_consumed += prefix_bytes; + break; + } + let n = self.active_decoder.decode(&data[total_consumed..], 1)?; + total_consumed += n; + self.remaining_capacity -= 1; } Ok(total_consumed) } /// Produce a `RecordBatch` if at least one row is fully decoded, returning /// `Ok(None)` if no new rows are available. pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> { - if self.decoded_rows == 0 { - Ok(None) - } else { - let batch = self.record_decoder.flush()?; - self.decoded_rows = 0; - Ok(Some(batch)) + if self.remaining_capacity == self.batch_size { + return Ok(None); + } + let batch = self.active_decoder.flush()?; + self.remaining_capacity = self.batch_size; + // Apply a pending schema switch if one is staged + if let Some((new_fingerprint, new_decoder)) = self.pending_schema.take() { + // Cache the old decoder before replacing it + if let Some(old_fingerprint) = self.active_fingerprint.replace(new_fingerprint) { + let old_decoder = std::mem::replace(&mut self.active_decoder, new_decoder); + self.cache.shift_remove(&old_fingerprint); + self.cache.insert(old_fingerprint, old_decoder); + if self.cache.len() > self.max_cache_size { + self.cache.shift_remove_index(0); + } + } else { + self.active_decoder = new_decoder; + } + } + Ok(Some(batch)) + } + + #[inline] + fn handle_prefix( + &mut self, + buf: &[u8], + hash_type: FingerprintAlgorithm, + ) -> Result<Option<usize>, ArrowError> { + if self.writer_schema_store.is_none() || !buf.starts_with(&SINGLE_OBJECT_MAGIC) { + return Ok(None); + } + let full_len = prefix_len(hash_type); + if buf.len() < full_len { + return Ok(Some(0)); // Not enough data to read the full fingerprint + } + let fp_bytes = &buf[2..full_len]; + let new_fp = match hash_type { + FingerprintAlgorithm::Rabin => { + let Ok(bytes) = <[u8; 8]>::try_from(fp_bytes) else { + return Err(ArrowError::ParseError(format!( + "Invalid Rabin fingerprint length, expected 8, got {}", + fp_bytes.len() + ))); + }; + Fingerprint::Rabin(u64::from_le_bytes(bytes)) + } + }; + // If the fingerprint indicates a schema change, prepare to switch decoders. + if self.active_fingerprint != Some(new_fp) { + self.prepare_schema_switch(new_fp)?; + // If there are already decoded rows, we must flush them first. + // Forcing the batch to be full ensures `flush` is called next. + if self.remaining_capacity < self.batch_size { + self.remaining_capacity = 0; + } } + Ok(Some(full_len)) + } + + fn prepare_schema_switch(&mut self, new_fingerprint: Fingerprint) -> Result<(), ArrowError> { + let new_decoder = if let Some(decoder) = self.cache.shift_remove(&new_fingerprint) { + decoder + } else { Review Comment: ```suggestion let new_decoder = self.cache.shift_remove(&new_fingerprint).unwrap_or_else(|| { ``` ########## arrow-avro/src/reader/mod.rs: ########## @@ -154,39 +167,134 @@ impl Decoder { /// /// Returns the number of bytes consumed. pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> { + if self.active_fingerprint.is_none() + && self.writer_schema_store.is_some() + && !data.starts_with(&SINGLE_OBJECT_MAGIC) + { + return Err(ArrowError::ParseError( + "Expected single‑object encoding fingerprint prefix for first message \ + (writer_schema_store is set but active_fingerprint is None)" + .into(), + )); + } let mut total_consumed = 0usize; - while total_consumed < data.len() && self.decoded_rows < self.batch_size { - let consumed = self.record_decoder.decode(&data[total_consumed..], 1)?; - // A successful call to record_decoder.decode means one row was decoded. - // If `consumed` is 0 on a non-empty buffer, it implies a valid zero-byte record. - // We increment `decoded_rows` to mark progress and avoid an infinite loop. - // We add `consumed` (which can be 0) to `total_consumed`. - total_consumed += consumed; - self.decoded_rows += 1; + let hash_type = self.writer_schema_store.as_ref().map_or( + FingerprintAlgorithm::Rabin, + SchemaStore::fingerprint_algorithm, + ); + while total_consumed < data.len() && self.remaining_capacity > 0 { + if let Some(prefix_bytes) = self.handle_prefix(&data[total_consumed..], hash_type)? { + // A batch is complete when its `remaining_capacity` is 0. It may be completed early if + // a schema change is detected or there are insufficient bytes to read the next prefix. + // A schema change requires a new batch. + total_consumed += prefix_bytes; + break; + } + let n = self.active_decoder.decode(&data[total_consumed..], 1)?; + total_consumed += n; + self.remaining_capacity -= 1; } Ok(total_consumed) } /// Produce a `RecordBatch` if at least one row is fully decoded, returning /// `Ok(None)` if no new rows are available. pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> { - if self.decoded_rows == 0 { - Ok(None) - } else { - let batch = self.record_decoder.flush()?; - self.decoded_rows = 0; - Ok(Some(batch)) + if self.remaining_capacity == self.batch_size { + return Ok(None); + } + let batch = self.active_decoder.flush()?; + self.remaining_capacity = self.batch_size; + // Apply a pending schema switch if one is staged + if let Some((new_fingerprint, new_decoder)) = self.pending_schema.take() { + // Cache the old decoder before replacing it + if let Some(old_fingerprint) = self.active_fingerprint.replace(new_fingerprint) { + let old_decoder = std::mem::replace(&mut self.active_decoder, new_decoder); + self.cache.shift_remove(&old_fingerprint); + self.cache.insert(old_fingerprint, old_decoder); + if self.cache.len() > self.max_cache_size { + self.cache.shift_remove_index(0); + } + } else { + self.active_decoder = new_decoder; + } + } + Ok(Some(batch)) + } + + #[inline] + fn handle_prefix( + &mut self, + buf: &[u8], + hash_type: FingerprintAlgorithm, + ) -> Result<Option<usize>, ArrowError> { + if self.writer_schema_store.is_none() || !buf.starts_with(&SINGLE_OBJECT_MAGIC) { + return Ok(None); + } + let full_len = prefix_len(hash_type); + if buf.len() < full_len { + return Ok(Some(0)); // Not enough data to read the full fingerprint + } + let fp_bytes = &buf[2..full_len]; + let new_fp = match hash_type { + FingerprintAlgorithm::Rabin => { + let Ok(bytes) = <[u8; 8]>::try_from(fp_bytes) else { + return Err(ArrowError::ParseError(format!( + "Invalid Rabin fingerprint length, expected 8, got {}", + fp_bytes.len() + ))); + }; + Fingerprint::Rabin(u64::from_le_bytes(bytes)) + } + }; + // If the fingerprint indicates a schema change, prepare to switch decoders. + if self.active_fingerprint != Some(new_fp) { + self.prepare_schema_switch(new_fp)?; + // If there are already decoded rows, we must flush them first. + // Forcing the batch to be full ensures `flush` is called next. + if self.remaining_capacity < self.batch_size { + self.remaining_capacity = 0; + } } + Ok(Some(full_len)) + } + + fn prepare_schema_switch(&mut self, new_fingerprint: Fingerprint) -> Result<(), ArrowError> { + let new_decoder = if let Some(decoder) = self.cache.shift_remove(&new_fingerprint) { + decoder + } else { Review Comment: Oh... all those `?` complicate things a lot. Never mind. ########## arrow-avro/src/reader/mod.rs: ########## @@ -154,39 +167,134 @@ impl Decoder { /// /// Returns the number of bytes consumed. pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> { + if self.active_fingerprint.is_none() + && self.writer_schema_store.is_some() + && !data.starts_with(&SINGLE_OBJECT_MAGIC) + { + return Err(ArrowError::ParseError( + "Expected single‑object encoding fingerprint prefix for first message \ + (writer_schema_store is set but active_fingerprint is None)" + .into(), + )); + } let mut total_consumed = 0usize; - while total_consumed < data.len() && self.decoded_rows < self.batch_size { - let consumed = self.record_decoder.decode(&data[total_consumed..], 1)?; - // A successful call to record_decoder.decode means one row was decoded. - // If `consumed` is 0 on a non-empty buffer, it implies a valid zero-byte record. - // We increment `decoded_rows` to mark progress and avoid an infinite loop. - // We add `consumed` (which can be 0) to `total_consumed`. - total_consumed += consumed; - self.decoded_rows += 1; + let hash_type = self.writer_schema_store.as_ref().map_or( + FingerprintAlgorithm::Rabin, + SchemaStore::fingerprint_algorithm, + ); + while total_consumed < data.len() && self.remaining_capacity > 0 { + if let Some(prefix_bytes) = self.handle_prefix(&data[total_consumed..], hash_type)? { + // A batch is complete when its `remaining_capacity` is 0. It may be completed early if + // a schema change is detected or there are insufficient bytes to read the next prefix. + // A schema change requires a new batch. + total_consumed += prefix_bytes; + break; + } + let n = self.active_decoder.decode(&data[total_consumed..], 1)?; + total_consumed += n; + self.remaining_capacity -= 1; } Ok(total_consumed) } /// Produce a `RecordBatch` if at least one row is fully decoded, returning /// `Ok(None)` if no new rows are available. pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> { - if self.decoded_rows == 0 { - Ok(None) - } else { - let batch = self.record_decoder.flush()?; - self.decoded_rows = 0; - Ok(Some(batch)) + if self.remaining_capacity == self.batch_size { + return Ok(None); + } + let batch = self.active_decoder.flush()?; + self.remaining_capacity = self.batch_size; + // Apply a pending schema switch if one is staged + if let Some((new_fingerprint, new_decoder)) = self.pending_schema.take() { + // Cache the old decoder before replacing it + if let Some(old_fingerprint) = self.active_fingerprint.replace(new_fingerprint) { + let old_decoder = std::mem::replace(&mut self.active_decoder, new_decoder); + self.cache.shift_remove(&old_fingerprint); + self.cache.insert(old_fingerprint, old_decoder); + if self.cache.len() > self.max_cache_size { + self.cache.shift_remove_index(0); + } + } else { + self.active_decoder = new_decoder; + } + } + Ok(Some(batch)) + } + + #[inline] + fn handle_prefix( + &mut self, + buf: &[u8], + hash_type: FingerprintAlgorithm, + ) -> Result<Option<usize>, ArrowError> { + if self.writer_schema_store.is_none() || !buf.starts_with(&SINGLE_OBJECT_MAGIC) { + return Ok(None); + } + let full_len = prefix_len(hash_type); + if buf.len() < full_len { + return Ok(Some(0)); // Not enough data to read the full fingerprint + } + let fp_bytes = &buf[2..full_len]; + let new_fp = match hash_type { + FingerprintAlgorithm::Rabin => { + let Ok(bytes) = <[u8; 8]>::try_from(fp_bytes) else { + return Err(ArrowError::ParseError(format!( + "Invalid Rabin fingerprint length, expected 8, got {}", + fp_bytes.len() + ))); + }; + Fingerprint::Rabin(u64::from_le_bytes(bytes)) + } + }; + // If the fingerprint indicates a schema change, prepare to switch decoders. + if self.active_fingerprint != Some(new_fp) { + self.prepare_schema_switch(new_fp)?; + // If there are already decoded rows, we must flush them first. + // Forcing the batch to be full ensures `flush` is called next. + if self.remaining_capacity < self.batch_size { + self.remaining_capacity = 0; + } } + Ok(Some(full_len)) + } + + fn prepare_schema_switch(&mut self, new_fingerprint: Fingerprint) -> Result<(), ArrowError> { + let new_decoder = if let Some(decoder) = self.cache.shift_remove(&new_fingerprint) { + decoder + } else { Review Comment: Actually, what if this helper method only dealt with the decoder, and the (single) caller installed it? Instead of: ```rust self.prepare_schema_switch(new_fp)?; ``` do ```rust let new_decoder = match self.cache.shift_remove(&new_fingerprint) { Some(decoder) => decoder, None => self.create_decoder_for(new_fingerprint)?, }; self.pending_schema = Some((new_fingerprint, new_decoder)) ``` Where `create_decoder_for` is the logic from this `else` block? ########## arrow-avro/src/reader/mod.rs: ########## @@ -168,48 +167,58 @@ impl Decoder { /// /// Returns the number of bytes consumed. pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> { + if self.active_fingerprint.is_none() + && self.writer_schema_store.is_some() + && !data.starts_with(&SINGLE_OBJECT_MAGIC) + { + return Err(ArrowError::ParseError( + "Expected single‑object encoding fingerprint prefix for first message \ + (writer_schema_store is set but active_fingerprint is None)" + .into(), + )); + } let mut total_consumed = 0usize; - let hash_type = self.schema_store.as_ref().map_or( + let hash_type = self.writer_schema_store.as_ref().map_or( FingerprintAlgorithm::Rabin, SchemaStore::fingerprint_algorithm, ); - while total_consumed < data.len() && self.decoded_rows < self.batch_size { + while total_consumed < data.len() && self.remaining_capacity > 0 { if let Some(prefix_bytes) = self.handle_prefix(&data[total_consumed..], hash_type)? { - // Schema change detected (> 0) or there were insufficient bytes to read the next prefix (= 0). - // If the former, the batch must end because the next record has a different schema. - // If the latter, batch ends because the caller needs to fetch more bytes. + // A batch is complete when its `remaining_capacity` is 0. It may be completed early if + // a schema change is detected or there are insufficient bytes to read the next prefix. + // A schema change requires a new batch. Review Comment: This comment seems a bit misplaced? Should it be at L193 below? ########## arrow-avro/src/schema.rs: ########## @@ -260,13 +274,369 @@ pub struct Fixed<'a> { pub attributes: Attributes<'a>, } +/// Supported fingerprint algorithms for Avro schema identification. +/// Currently only `Rabin` is supported, `SHA256` and `MD5` support will come in a future update +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum FingerprintAlgorithm { + /// 64‑bit CRC‑64‑AVRO Rabin fingerprint. + Rabin, +} + +/// A schema fingerprint in one of the supported formats. +/// +/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore` +/// instance always stores only one variant, matching its configured +/// `FingerprintAlgorithm`, but the enum makes the API uniform. +/// Currently only `Rabin` is supported +/// +/// <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints> +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum Fingerprint { + /// A 64-bit Rabin fingerprint. + Rabin(u64), +} + +/// Allow easy extraction of the algorithm used to create a fingerprint. +impl From<&Fingerprint> for FingerprintAlgorithm { + #[inline] + fn from(fp: &Fingerprint) -> Self { + match fp { + Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin, + } + } +} + +/// Generates a fingerprint for the given `Schema` using the specified `FingerprintAlgorithm`. +#[inline] +pub(crate) fn generate_fingerprint( + schema: &Schema, + hash_type: FingerprintAlgorithm, +) -> Result<Fingerprint, ArrowError> { + let canonical = generate_canonical_form(schema).map_err(|e| { + ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}")) + })?; + match hash_type { + FingerprintAlgorithm::Rabin => { + Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical))) + } + } +} + +/// Generates the 64-bit Rabin fingerprint for the given `Schema`. +/// +/// The fingerprint is computed from the canonical form of the schema. +/// This is also known as `CRC-64-AVRO`. +/// +/// # Returns +/// A `Fingerprint::Rabin` variant containing the 64-bit fingerprint. +#[inline] +pub fn generate_fingerprint_rabin(schema: &Schema) -> Result<Fingerprint, ArrowError> { + generate_fingerprint(schema, FingerprintAlgorithm::Rabin) +} + +/// Generates the Parsed Canonical Form for the given [`Schema`]. +/// +/// The canonical form is a standardized JSON representation of the schema, +/// primarily used for generating a schema fingerprint for equality checking. +/// +/// This form strips attributes that do not affect the schema's identity, +/// such as `doc` fields, `aliases`, and any properties not defined in the +/// Avro specification. +/// +/// <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas> +#[inline] +pub fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> { + build_canonical(schema, None) +} + +/// An in-memory cache of Avro schemas, indexed by their fingerprint. +/// +/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas efficiently. +/// Each schema is associated with a unique [`Fingerprint`], which is generated based +/// on the schema's canonical form and a specific hashing algorithm. +/// +/// A `SchemaStore` instance is configured to use a single [`FingerprintAlgorithm`] such as Rabin, +/// MD5 (not yet supported), or SHA256 (not yet supported) for all its operations. +/// This ensures consistency when generating fingerprints and looking up schemas. +/// All schemas registered will have their fingerprint computed with this algorithm, and +/// lookups must use a matching fingerprint. +/// +/// The lifetime parameter `'a` corresponds to the lifetime of the string slices +/// contained within the stored [`Schema`] objects. This means the `SchemaStore` +/// cannot outlive the data referenced by the schemas it contains. +/// +/// # Examples +/// +/// ```no_run +/// // Create a new store with the default Rabin fingerprinting. +/// use arrow_avro::schema::{PrimitiveType, Schema, SchemaStore, TypeName}; +/// +/// let mut store = SchemaStore::new(); +/// let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String)); +/// // Register the schema to get its fingerprint. +/// let fingerprint = store.register(schema.clone()).unwrap(); +/// // Use the fingerprint to look up the schema. +/// let retrieved_schema = store.lookup(&fingerprint).cloned(); +/// assert_eq!(retrieved_schema, Some(schema)); +/// ``` +#[derive(Debug, Clone)] +pub struct SchemaStore<'a> { + /// The hashing algorithm used for generating fingerprints. + fingerprint_algorithm: FingerprintAlgorithm, + /// A map from a schema's fingerprint to the schema itself. + schemas: HashMap<Fingerprint, Schema<'a>>, +} + +impl<'a> TryFrom<&'a [Schema<'a>]> for SchemaStore<'a> { + type Error = ArrowError; + + /// Creates a `SchemaStore` from a slice of schemas. + /// Each schema in the slice is registered with the new store. + fn try_from(schemas: &'a [Schema<'a>]) -> Result<Self, Self::Error> { + let mut store = SchemaStore::new(); + for schema in schemas { + store.register(schema.clone())?; + } + Ok(store) + } +} + +impl<'a> Default for SchemaStore<'a> { + fn default() -> Self { + Self { + fingerprint_algorithm: FingerprintAlgorithm::Rabin, + schemas: HashMap::new(), + } + } +} + +impl<'a> SchemaStore<'a> { + /// Creates an empty `SchemaStore` using the default fingerprinting algorithm (64-bit Rabin). + pub fn new() -> Self { + Self::default() + } + + /// Registers a schema with the store and returns its fingerprint. + /// + /// A fingerprint is calculated for the given schema using the store's configured + /// hash type. If a schema with the same fingerprint does not already exist in the + /// store, the new schema is inserted. If the fingerprint already exists, the + /// existing schema is not overwritten. + /// + /// # Arguments + /// + /// * `schema` - The schema to register. + /// + /// # Returns + /// + /// A `Result` containing the `Fingerprint` of the schema if successful, + /// or an `ArrowError` on failure. + pub fn register(&mut self, schema: Schema<'a>) -> Result<Fingerprint, ArrowError> { + let fp = generate_fingerprint(&schema, self.fingerprint_algorithm)?; + match self.schemas.entry(fp) { + Entry::Occupied(entry) => { + if entry.get() != &schema { + return Err(ArrowError::ComputeError(format!( + "Schema fingerprint collision detected for fingerprint {fp:?}" + ))); + } + } + Entry::Vacant(entry) => { + entry.insert(schema); + } + } + Ok(fp) + } + + /// Looks up a schema by its `Fingerprint`. + /// + /// # Arguments + /// + /// * `fp` - A reference to the `Fingerprint` of the schema to look up. + /// + /// # Returns + /// + /// An `Option` containing a clone of the `Schema` if found, otherwise `None`. + pub fn lookup(&self, fp: &Fingerprint) -> Option<&Schema<'a>> { + self.schemas.get(fp) + } + + /// Returns the `FingerprintAlgorithm` used by the `SchemaStore` for fingerprinting. + pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm { + self.fingerprint_algorithm + } +} + +#[inline] +fn quote(s: &str) -> Result<String, ArrowError> { + serde_json::to_string(s) + .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: {e}"))) +} + +fn make_fullname(name: &str, namespace_attr: Option<&str>, enclosing_ns: Option<&str>) -> String { + match namespace_attr.or(enclosing_ns) { + Some(ns) if !name.contains('.') => format!("{ns}.{name}"), + _ => name.to_string(), + } +} + +fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> Result<String, ArrowError> { + #[inline] + fn prim_str(pt: &PrimitiveType) -> &'static str { + match pt { + PrimitiveType::Null => "null", + PrimitiveType::Boolean => "boolean", + PrimitiveType::Int => "int", + PrimitiveType::Long => "long", + PrimitiveType::Float => "float", + PrimitiveType::Double => "double", + PrimitiveType::Bytes => "bytes", + PrimitiveType::String => "string", + } + } Review Comment: Seems like a good use for strum or similar crate, to derive this automatically? ########## arrow-avro/src/reader/mod.rs: ########## @@ -246,8 +252,8 @@ impl Decoder { self.prepare_schema_switch(new_fp)?; // If there are already decoded rows, we must flush them first. // Forcing the batch to be full ensures `flush` is called next. - if self.decoded_rows > 0 { - self.decoded_rows = self.batch_size; + if self.remaining_capacity < self.batch_size { Review Comment: Interesting.. it's possible for two schema changes to come with no rows in between? And this check prevents emitting an empty batch in that corner case? ########## arrow-avro/src/reader/mod.rs: ########## @@ -272,17 +450,70 @@ impl ReaderBuilder { self } - /// Sets the Avro schema. + /// Sets the Avro reader schema. /// /// If a schema is not provided, the schema will be read from the Avro file header. - pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self { - self.schema = Some(schema); + pub fn with_reader_schema(mut self, reader_schema: AvroSchema<'static>) -> Self { + self.reader_schema = Some(reader_schema); self } + /// Sets the `SchemaStore` used for resolving writer schemas. + /// + /// This is necessary when decoding single-object encoded data that identifies + /// schemas by a fingerprint. The store allows the decoder to look up the + /// full writer schema from a fingerprint embedded in the data. + /// + /// Defaults to `None`. + pub fn with_writer_schema_store(mut self, store: SchemaStore<'static>) -> Self { + self.writer_schema_store = Some(store); + self + } + + /// Sets the initial schema fingerprint for decoding single-object encoded data. + /// + /// This is useful when the data stream does not begin with a schema definition + /// or fingerprint, allowing the decoder to start with a known schema from the + /// `SchemaStore`. + /// + /// Defaults to `None`. + pub fn with_active_fingerprint(mut self, fp: Fingerprint) -> Self { + self.active_fingerprint = Some(fp); + self + } + + /// Set the maximum number of decoders to cache. + /// + /// When dealing with Avro files that contain multiple schemas, we may need to switch + /// between different decoders. This cache avoids rebuilding them from scratch every time. + /// + /// Defaults to `20`. + pub fn with_max_decoder_cache_size(mut self, n: usize) -> Self { + self.decoder_cache_size = n; + self + } + + fn validate(&self) -> Result<(), ArrowError> { + match ( + self.writer_schema_store.as_ref(), + self.reader_schema.as_ref(), + self.active_fingerprint.as_ref(), + ) { + (Some(_), None, _) => Err(ArrowError::ParseError( + "Reader schema must be set when writer schema store is provided".into(), + )), + (None, _, Some(_)) => Err(ArrowError::ParseError( + "Active fingerprint requires a writer schema store".into(), + )), + _ => Ok(()), + } + } + /// Create a [`Reader`] from this builder and a `BufRead` pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, ArrowError> { - let (header, decoder) = self.build_impl(&mut reader)?; + self.validate()?; + let header = read_header(&mut reader)?; + let decoder = self.make_decoder(Some(&header))?; Ok(Reader { reader, header, Review Comment: why not just: ```suggestion Ok(Reader { reader: read_header(&mut reader)?, header: self.make_decoder(Some(&header))?, ``` ########## arrow-avro/src/reader/mod.rs: ########## @@ -154,39 +167,134 @@ impl Decoder { /// /// Returns the number of bytes consumed. pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> { + if self.active_fingerprint.is_none() + && self.writer_schema_store.is_some() + && !data.starts_with(&SINGLE_OBJECT_MAGIC) + { + return Err(ArrowError::ParseError( + "Expected single‑object encoding fingerprint prefix for first message \ + (writer_schema_store is set but active_fingerprint is None)" + .into(), + )); + } let mut total_consumed = 0usize; - while total_consumed < data.len() && self.decoded_rows < self.batch_size { - let consumed = self.record_decoder.decode(&data[total_consumed..], 1)?; - // A successful call to record_decoder.decode means one row was decoded. - // If `consumed` is 0 on a non-empty buffer, it implies a valid zero-byte record. - // We increment `decoded_rows` to mark progress and avoid an infinite loop. - // We add `consumed` (which can be 0) to `total_consumed`. - total_consumed += consumed; - self.decoded_rows += 1; + let hash_type = self.writer_schema_store.as_ref().map_or( + FingerprintAlgorithm::Rabin, + SchemaStore::fingerprint_algorithm, + ); + while total_consumed < data.len() && self.remaining_capacity > 0 { + if let Some(prefix_bytes) = self.handle_prefix(&data[total_consumed..], hash_type)? { + // A batch is complete when its `remaining_capacity` is 0. It may be completed early if + // a schema change is detected or there are insufficient bytes to read the next prefix. + // A schema change requires a new batch. + total_consumed += prefix_bytes; + break; + } + let n = self.active_decoder.decode(&data[total_consumed..], 1)?; + total_consumed += n; + self.remaining_capacity -= 1; } Ok(total_consumed) } /// Produce a `RecordBatch` if at least one row is fully decoded, returning /// `Ok(None)` if no new rows are available. pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> { - if self.decoded_rows == 0 { - Ok(None) - } else { - let batch = self.record_decoder.flush()?; - self.decoded_rows = 0; - Ok(Some(batch)) + if self.remaining_capacity == self.batch_size { + return Ok(None); + } + let batch = self.active_decoder.flush()?; + self.remaining_capacity = self.batch_size; + // Apply a pending schema switch if one is staged + if let Some((new_fingerprint, new_decoder)) = self.pending_schema.take() { + // Cache the old decoder before replacing it + if let Some(old_fingerprint) = self.active_fingerprint.replace(new_fingerprint) { + let old_decoder = std::mem::replace(&mut self.active_decoder, new_decoder); + self.cache.shift_remove(&old_fingerprint); + self.cache.insert(old_fingerprint, old_decoder); + if self.cache.len() > self.max_cache_size { + self.cache.shift_remove_index(0); + } + } else { + self.active_decoder = new_decoder; + } + } + Ok(Some(batch)) + } + + #[inline] + fn handle_prefix( + &mut self, + buf: &[u8], + hash_type: FingerprintAlgorithm, + ) -> Result<Option<usize>, ArrowError> { + if self.writer_schema_store.is_none() || !buf.starts_with(&SINGLE_OBJECT_MAGIC) { + return Ok(None); + } + let full_len = prefix_len(hash_type); + if buf.len() < full_len { + return Ok(Some(0)); // Not enough data to read the full fingerprint + } + let fp_bytes = &buf[2..full_len]; + let new_fp = match hash_type { + FingerprintAlgorithm::Rabin => { + let Ok(bytes) = <[u8; 8]>::try_from(fp_bytes) else { + return Err(ArrowError::ParseError(format!( + "Invalid Rabin fingerprint length, expected 8, got {}", + fp_bytes.len() + ))); + }; + Fingerprint::Rabin(u64::from_le_bytes(bytes)) + } + }; Review Comment: This code has 3-4 bounds checks (one implied by indexing). Can we fold it all into a single block of code (eliminating the need for `prefix_len` helper)? ```suggestion let fp_bytes = &buf[2..]; // safe thanks to the `starts_with` check above let new_fp = match hash_type { FingerprintAlgorithm::Rabin => { let Some(Ok(bytes)) = fp_bytes.get(..8).map(Into::into) else { return Err(ArrowError::ParseError(format!( "Invalid Rabin fingerprint length, expected 8, got {}", fp_bytes.len() ))); }; Fingerprint::Rabin(u64::from_le_bytes(bytes)) } }; ``` ########## arrow-avro/src/schema.rs: ########## @@ -260,13 +274,369 @@ pub struct Fixed<'a> { pub attributes: Attributes<'a>, } +/// Supported fingerprint algorithms for Avro schema identification. +/// Currently only `Rabin` is supported, `SHA256` and `MD5` support will come in a future update +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum FingerprintAlgorithm { + /// 64‑bit CRC‑64‑AVRO Rabin fingerprint. + Rabin, +} + +/// A schema fingerprint in one of the supported formats. +/// +/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore` +/// instance always stores only one variant, matching its configured +/// `FingerprintAlgorithm`, but the enum makes the API uniform. +/// Currently only `Rabin` is supported +/// +/// <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints> +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum Fingerprint { + /// A 64-bit Rabin fingerprint. + Rabin(u64), +} + +/// Allow easy extraction of the algorithm used to create a fingerprint. +impl From<&Fingerprint> for FingerprintAlgorithm { + #[inline] + fn from(fp: &Fingerprint) -> Self { + match fp { + Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin, + } + } +} + +/// Generates a fingerprint for the given `Schema` using the specified `FingerprintAlgorithm`. +#[inline] +pub(crate) fn generate_fingerprint( + schema: &Schema, + hash_type: FingerprintAlgorithm, +) -> Result<Fingerprint, ArrowError> { + let canonical = generate_canonical_form(schema).map_err(|e| { + ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}")) + })?; + match hash_type { + FingerprintAlgorithm::Rabin => { + Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical))) + } + } +} + +/// Generates the 64-bit Rabin fingerprint for the given `Schema`. +/// +/// The fingerprint is computed from the canonical form of the schema. +/// This is also known as `CRC-64-AVRO`. +/// +/// # Returns +/// A `Fingerprint::Rabin` variant containing the 64-bit fingerprint. +#[inline] +pub fn generate_fingerprint_rabin(schema: &Schema) -> Result<Fingerprint, ArrowError> { + generate_fingerprint(schema, FingerprintAlgorithm::Rabin) +} + +/// Generates the Parsed Canonical Form for the given [`Schema`]. +/// +/// The canonical form is a standardized JSON representation of the schema, +/// primarily used for generating a schema fingerprint for equality checking. +/// +/// This form strips attributes that do not affect the schema's identity, +/// such as `doc` fields, `aliases`, and any properties not defined in the +/// Avro specification. +/// +/// <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas> +#[inline] +pub fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> { + build_canonical(schema, None) +} + +/// An in-memory cache of Avro schemas, indexed by their fingerprint. +/// +/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas efficiently. +/// Each schema is associated with a unique [`Fingerprint`], which is generated based +/// on the schema's canonical form and a specific hashing algorithm. +/// +/// A `SchemaStore` instance is configured to use a single [`FingerprintAlgorithm`] such as Rabin, +/// MD5 (not yet supported), or SHA256 (not yet supported) for all its operations. +/// This ensures consistency when generating fingerprints and looking up schemas. +/// All schemas registered will have their fingerprint computed with this algorithm, and +/// lookups must use a matching fingerprint. +/// +/// The lifetime parameter `'a` corresponds to the lifetime of the string slices +/// contained within the stored [`Schema`] objects. This means the `SchemaStore` +/// cannot outlive the data referenced by the schemas it contains. +/// +/// # Examples +/// +/// ```no_run +/// // Create a new store with the default Rabin fingerprinting. +/// use arrow_avro::schema::{PrimitiveType, Schema, SchemaStore, TypeName}; +/// +/// let mut store = SchemaStore::new(); +/// let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String)); +/// // Register the schema to get its fingerprint. +/// let fingerprint = store.register(schema.clone()).unwrap(); +/// // Use the fingerprint to look up the schema. +/// let retrieved_schema = store.lookup(&fingerprint).cloned(); +/// assert_eq!(retrieved_schema, Some(schema)); +/// ``` +#[derive(Debug, Clone)] +pub struct SchemaStore<'a> { + /// The hashing algorithm used for generating fingerprints. + fingerprint_algorithm: FingerprintAlgorithm, + /// A map from a schema's fingerprint to the schema itself. + schemas: HashMap<Fingerprint, Schema<'a>>, +} + +impl<'a> TryFrom<&'a [Schema<'a>]> for SchemaStore<'a> { + type Error = ArrowError; + + /// Creates a `SchemaStore` from a slice of schemas. + /// Each schema in the slice is registered with the new store. + fn try_from(schemas: &'a [Schema<'a>]) -> Result<Self, Self::Error> { + let mut store = SchemaStore::new(); + for schema in schemas { + store.register(schema.clone())?; + } + Ok(store) + } +} + +impl<'a> Default for SchemaStore<'a> { + fn default() -> Self { + Self { + fingerprint_algorithm: FingerprintAlgorithm::Rabin, + schemas: HashMap::new(), + } + } +} + +impl<'a> SchemaStore<'a> { + /// Creates an empty `SchemaStore` using the default fingerprinting algorithm (64-bit Rabin). + pub fn new() -> Self { + Self::default() + } + + /// Registers a schema with the store and returns its fingerprint. + /// + /// A fingerprint is calculated for the given schema using the store's configured + /// hash type. If a schema with the same fingerprint does not already exist in the + /// store, the new schema is inserted. If the fingerprint already exists, the + /// existing schema is not overwritten. + /// + /// # Arguments + /// + /// * `schema` - The schema to register. + /// + /// # Returns + /// + /// A `Result` containing the `Fingerprint` of the schema if successful, + /// or an `ArrowError` on failure. + pub fn register(&mut self, schema: Schema<'a>) -> Result<Fingerprint, ArrowError> { + let fp = generate_fingerprint(&schema, self.fingerprint_algorithm)?; + match self.schemas.entry(fp) { + Entry::Occupied(entry) => { + if entry.get() != &schema { + return Err(ArrowError::ComputeError(format!( + "Schema fingerprint collision detected for fingerprint {fp:?}" + ))); + } + } + Entry::Vacant(entry) => { + entry.insert(schema); + } + } + Ok(fp) + } + + /// Looks up a schema by its `Fingerprint`. + /// + /// # Arguments + /// + /// * `fp` - A reference to the `Fingerprint` of the schema to look up. + /// + /// # Returns + /// + /// An `Option` containing a clone of the `Schema` if found, otherwise `None`. + pub fn lookup(&self, fp: &Fingerprint) -> Option<&Schema<'a>> { + self.schemas.get(fp) + } + + /// Returns the `FingerprintAlgorithm` used by the `SchemaStore` for fingerprinting. + pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm { + self.fingerprint_algorithm + } +} + +#[inline] +fn quote(s: &str) -> Result<String, ArrowError> { + serde_json::to_string(s) + .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: {e}"))) +} + +fn make_fullname(name: &str, namespace_attr: Option<&str>, enclosing_ns: Option<&str>) -> String { + match namespace_attr.or(enclosing_ns) { + Some(ns) if !name.contains('.') => format!("{ns}.{name}"), + _ => name.to_string(), + } +} + +fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> Result<String, ArrowError> { + #[inline] + fn prim_str(pt: &PrimitiveType) -> &'static str { + match pt { + PrimitiveType::Null => "null", + PrimitiveType::Boolean => "boolean", + PrimitiveType::Int => "int", + PrimitiveType::Long => "long", + PrimitiveType::Float => "float", + PrimitiveType::Double => "double", + PrimitiveType::Bytes => "bytes", + PrimitiveType::String => "string", + } + } + Ok(match schema { + Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match tn { + TypeName::Primitive(pt) => quote(prim_str(pt))?, + TypeName::Ref(name) => quote(&make_fullname(name, None, enclosing_ns))?, + }, + Schema::Union(branches) => format!( + "[{}]", + branches + .iter() + .map(|b| build_canonical(b, enclosing_ns)) + .collect::<Result<Vec<_>, _>>()? + .join(",") + ), + Schema::Complex(ct) => match ct { + ComplexType::Record(r) => { + let fullname = make_fullname(r.name, r.namespace, enclosing_ns); + let child_ns = fullname.rsplit_once('.').map(|(ns, _)| ns.to_string()); + let fields = r + .fields + .iter() + .map(|f| { + let field_type = + build_canonical(&f.r#type, child_ns.as_deref().or(enclosing_ns))?; + Ok(format!( + "{{\"name\":{},\"type\":{}}}", + quote(f.name)?, + field_type + )) + }) + .collect::<Result<Vec<_>, ArrowError>>()? + .join(","); + format!( + "{{\"name\":{},\"type\":\"record\",\"fields\":[{}]}}", + quote(&fullname)?, + fields + ) + } + ComplexType::Enum(e) => { + let fullname = make_fullname(e.name, e.namespace, enclosing_ns); + let symbols = e + .symbols + .iter() + .map(|s| quote(s)) + .collect::<Result<Vec<_>, _>>()? + .join(","); + format!( + "{{\"name\":{},\"type\":\"enum\",\"symbols\":[{}]}}", + quote(&fullname)?, + symbols + ) + } + ComplexType::Array(arr) => format!( + "{{\"type\":\"array\",\"items\":{}}}", + build_canonical(&arr.items, enclosing_ns)? + ), + ComplexType::Map(map) => format!( + "{{\"type\":\"map\",\"values\":{}}}", + build_canonical(&map.values, enclosing_ns)? + ), + ComplexType::Fixed(f) => format!( + "{{\"name\":{},\"type\":\"fixed\",\"size\":{}}}", + quote(&make_fullname(f.name, f.namespace, enclosing_ns))?, + f.size + ), + }, + }) +} + +/// 64‑bit Rabin fingerprint as described in the Avro spec. +const EMPTY: u64 = 0xc15d_213a_a4d7_a795; + +/// Build one entry of the polynomial division table. +const fn one_entry(i: usize) -> u64 { + let mut fp = i as u64; + let mut j = 0; + while j < 8 { + fp = (fp >> 1) ^ (EMPTY & (0u64.wrapping_sub(fp & 1))); + j += 1; + } + fp +} + +/// Build the full 256‑entry table at compile time. +const fn build_table() -> [u64; 256] { + let mut table = [0u64; 256]; + let mut i = 0; + while i < 256 { + table[i] = one_entry(i); + i += 1; + } + table +} + +/// The pre‑computed table. +static FINGERPRINT_TABLE: [u64; 256] = build_table(); + +/// Computes the 64-bit Rabin fingerprint for a given canonical schema string. +/// This implementation is based on the Avro specification for schema fingerprinting. +#[inline] Review Comment: is it actually important to manually mark so many functions `#[inline]`? From what I've seen, rustc is anyway super aggressive about inlining, so I would be surprised if there are really so many cases where it hurt performance by choosing not to inline these function calls automatically? ########## arrow-avro/src/schema.rs: ########## @@ -260,13 +274,369 @@ pub struct Fixed<'a> { pub attributes: Attributes<'a>, } +/// Supported fingerprint algorithms for Avro schema identification. +/// Currently only `Rabin` is supported, `SHA256` and `MD5` support will come in a future update +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum FingerprintAlgorithm { + /// 64‑bit CRC‑64‑AVRO Rabin fingerprint. + Rabin, +} + +/// A schema fingerprint in one of the supported formats. +/// +/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore` +/// instance always stores only one variant, matching its configured +/// `FingerprintAlgorithm`, but the enum makes the API uniform. +/// Currently only `Rabin` is supported +/// +/// <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints> +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum Fingerprint { + /// A 64-bit Rabin fingerprint. + Rabin(u64), +} + +/// Allow easy extraction of the algorithm used to create a fingerprint. +impl From<&Fingerprint> for FingerprintAlgorithm { + #[inline] + fn from(fp: &Fingerprint) -> Self { + match fp { + Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin, + } + } +} + +/// Generates a fingerprint for the given `Schema` using the specified `FingerprintAlgorithm`. +#[inline] +pub(crate) fn generate_fingerprint( + schema: &Schema, + hash_type: FingerprintAlgorithm, +) -> Result<Fingerprint, ArrowError> { + let canonical = generate_canonical_form(schema).map_err(|e| { + ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}")) + })?; + match hash_type { + FingerprintAlgorithm::Rabin => { + Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical))) + } + } +} + +/// Generates the 64-bit Rabin fingerprint for the given `Schema`. +/// +/// The fingerprint is computed from the canonical form of the schema. +/// This is also known as `CRC-64-AVRO`. +/// +/// # Returns +/// A `Fingerprint::Rabin` variant containing the 64-bit fingerprint. +#[inline] +pub fn generate_fingerprint_rabin(schema: &Schema) -> Result<Fingerprint, ArrowError> { + generate_fingerprint(schema, FingerprintAlgorithm::Rabin) +} + +/// Generates the Parsed Canonical Form for the given [`Schema`]. +/// +/// The canonical form is a standardized JSON representation of the schema, +/// primarily used for generating a schema fingerprint for equality checking. +/// +/// This form strips attributes that do not affect the schema's identity, +/// such as `doc` fields, `aliases`, and any properties not defined in the +/// Avro specification. +/// +/// <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas> +#[inline] +pub fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> { + build_canonical(schema, None) +} + +/// An in-memory cache of Avro schemas, indexed by their fingerprint. +/// +/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas efficiently. +/// Each schema is associated with a unique [`Fingerprint`], which is generated based +/// on the schema's canonical form and a specific hashing algorithm. +/// +/// A `SchemaStore` instance is configured to use a single [`FingerprintAlgorithm`] such as Rabin, +/// MD5 (not yet supported), or SHA256 (not yet supported) for all its operations. +/// This ensures consistency when generating fingerprints and looking up schemas. +/// All schemas registered will have their fingerprint computed with this algorithm, and +/// lookups must use a matching fingerprint. +/// +/// The lifetime parameter `'a` corresponds to the lifetime of the string slices +/// contained within the stored [`Schema`] objects. This means the `SchemaStore` +/// cannot outlive the data referenced by the schemas it contains. +/// +/// # Examples +/// +/// ```no_run +/// // Create a new store with the default Rabin fingerprinting. +/// use arrow_avro::schema::{PrimitiveType, Schema, SchemaStore, TypeName}; +/// +/// let mut store = SchemaStore::new(); +/// let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String)); +/// // Register the schema to get its fingerprint. +/// let fingerprint = store.register(schema.clone()).unwrap(); +/// // Use the fingerprint to look up the schema. +/// let retrieved_schema = store.lookup(&fingerprint).cloned(); +/// assert_eq!(retrieved_schema, Some(schema)); +/// ``` +#[derive(Debug, Clone)] +pub struct SchemaStore<'a> { + /// The hashing algorithm used for generating fingerprints. + fingerprint_algorithm: FingerprintAlgorithm, + /// A map from a schema's fingerprint to the schema itself. + schemas: HashMap<Fingerprint, Schema<'a>>, +} + +impl<'a> TryFrom<&'a [Schema<'a>]> for SchemaStore<'a> { + type Error = ArrowError; + + /// Creates a `SchemaStore` from a slice of schemas. + /// Each schema in the slice is registered with the new store. + fn try_from(schemas: &'a [Schema<'a>]) -> Result<Self, Self::Error> { + let mut store = SchemaStore::new(); + for schema in schemas { + store.register(schema.clone())?; + } + Ok(store) + } +} + +impl<'a> Default for SchemaStore<'a> { + fn default() -> Self { + Self { + fingerprint_algorithm: FingerprintAlgorithm::Rabin, + schemas: HashMap::new(), + } + } +} + +impl<'a> SchemaStore<'a> { + /// Creates an empty `SchemaStore` using the default fingerprinting algorithm (64-bit Rabin). + pub fn new() -> Self { + Self::default() + } + + /// Registers a schema with the store and returns its fingerprint. + /// + /// A fingerprint is calculated for the given schema using the store's configured + /// hash type. If a schema with the same fingerprint does not already exist in the + /// store, the new schema is inserted. If the fingerprint already exists, the + /// existing schema is not overwritten. + /// + /// # Arguments + /// + /// * `schema` - The schema to register. + /// + /// # Returns + /// + /// A `Result` containing the `Fingerprint` of the schema if successful, + /// or an `ArrowError` on failure. + pub fn register(&mut self, schema: Schema<'a>) -> Result<Fingerprint, ArrowError> { + let fp = generate_fingerprint(&schema, self.fingerprint_algorithm)?; + match self.schemas.entry(fp) { + Entry::Occupied(entry) => { + if entry.get() != &schema { + return Err(ArrowError::ComputeError(format!( + "Schema fingerprint collision detected for fingerprint {fp:?}" + ))); + } + } + Entry::Vacant(entry) => { + entry.insert(schema); + } + } + Ok(fp) + } + + /// Looks up a schema by its `Fingerprint`. + /// + /// # Arguments + /// + /// * `fp` - A reference to the `Fingerprint` of the schema to look up. + /// + /// # Returns + /// + /// An `Option` containing a clone of the `Schema` if found, otherwise `None`. + pub fn lookup(&self, fp: &Fingerprint) -> Option<&Schema<'a>> { + self.schemas.get(fp) + } + + /// Returns the `FingerprintAlgorithm` used by the `SchemaStore` for fingerprinting. + pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm { + self.fingerprint_algorithm + } +} + +#[inline] +fn quote(s: &str) -> Result<String, ArrowError> { + serde_json::to_string(s) + .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: {e}"))) +} + +fn make_fullname(name: &str, namespace_attr: Option<&str>, enclosing_ns: Option<&str>) -> String { + match namespace_attr.or(enclosing_ns) { + Some(ns) if !name.contains('.') => format!("{ns}.{name}"), + _ => name.to_string(), + } +} + +fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> Result<String, ArrowError> { + #[inline] + fn prim_str(pt: &PrimitiveType) -> &'static str { + match pt { + PrimitiveType::Null => "null", + PrimitiveType::Boolean => "boolean", + PrimitiveType::Int => "int", + PrimitiveType::Long => "long", + PrimitiveType::Float => "float", + PrimitiveType::Double => "double", + PrimitiveType::Bytes => "bytes", + PrimitiveType::String => "string", + } + } + Ok(match schema { + Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match tn { + TypeName::Primitive(pt) => quote(prim_str(pt))?, + TypeName::Ref(name) => quote(&make_fullname(name, None, enclosing_ns))?, + }, + Schema::Union(branches) => format!( + "[{}]", + branches + .iter() + .map(|b| build_canonical(b, enclosing_ns)) + .collect::<Result<Vec<_>, _>>()? + .join(",") + ), + Schema::Complex(ct) => match ct { + ComplexType::Record(r) => { + let fullname = make_fullname(r.name, r.namespace, enclosing_ns); + let child_ns = fullname.rsplit_once('.').map(|(ns, _)| ns.to_string()); + let fields = r + .fields + .iter() + .map(|f| { + let field_type = + build_canonical(&f.r#type, child_ns.as_deref().or(enclosing_ns))?; + Ok(format!( + "{{\"name\":{},\"type\":{}}}", + quote(f.name)?, + field_type + )) + }) + .collect::<Result<Vec<_>, ArrowError>>()? + .join(","); + format!( + "{{\"name\":{},\"type\":\"record\",\"fields\":[{}]}}", + quote(&fullname)?, + fields + ) + } + ComplexType::Enum(e) => { + let fullname = make_fullname(e.name, e.namespace, enclosing_ns); + let symbols = e + .symbols + .iter() + .map(|s| quote(s)) + .collect::<Result<Vec<_>, _>>()? + .join(","); + format!( + "{{\"name\":{},\"type\":\"enum\",\"symbols\":[{}]}}", + quote(&fullname)?, + symbols + ) + } + ComplexType::Array(arr) => format!( + "{{\"type\":\"array\",\"items\":{}}}", + build_canonical(&arr.items, enclosing_ns)? + ), + ComplexType::Map(map) => format!( + "{{\"type\":\"map\",\"values\":{}}}", + build_canonical(&map.values, enclosing_ns)? + ), + ComplexType::Fixed(f) => format!( + "{{\"name\":{},\"type\":\"fixed\",\"size\":{}}}", + quote(&make_fullname(f.name, f.namespace, enclosing_ns))?, + f.size + ), + }, + }) +} + +/// 64‑bit Rabin fingerprint as described in the Avro spec. +const EMPTY: u64 = 0xc15d_213a_a4d7_a795; + +/// Build one entry of the polynomial division table. +const fn one_entry(i: usize) -> u64 { + let mut fp = i as u64; + let mut j = 0; + while j < 8 { + fp = (fp >> 1) ^ (EMPTY & (0u64.wrapping_sub(fp & 1))); + j += 1; + } + fp Review Comment: I guess we can't use iterators in `const` context? Otherwise `0..8` would be helpful here. ########## arrow-avro/src/reader/mod.rs: ########## @@ -272,17 +450,70 @@ impl ReaderBuilder { self } - /// Sets the Avro schema. + /// Sets the Avro reader schema. /// /// If a schema is not provided, the schema will be read from the Avro file header. - pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self { - self.schema = Some(schema); + pub fn with_reader_schema(mut self, reader_schema: AvroSchema<'static>) -> Self { + self.reader_schema = Some(reader_schema); self } + /// Sets the `SchemaStore` used for resolving writer schemas. + /// + /// This is necessary when decoding single-object encoded data that identifies + /// schemas by a fingerprint. The store allows the decoder to look up the + /// full writer schema from a fingerprint embedded in the data. + /// + /// Defaults to `None`. + pub fn with_writer_schema_store(mut self, store: SchemaStore<'static>) -> Self { + self.writer_schema_store = Some(store); + self + } + + /// Sets the initial schema fingerprint for decoding single-object encoded data. + /// + /// This is useful when the data stream does not begin with a schema definition + /// or fingerprint, allowing the decoder to start with a known schema from the + /// `SchemaStore`. + /// + /// Defaults to `None`. + pub fn with_active_fingerprint(mut self, fp: Fingerprint) -> Self { + self.active_fingerprint = Some(fp); + self + } + + /// Set the maximum number of decoders to cache. + /// + /// When dealing with Avro files that contain multiple schemas, we may need to switch + /// between different decoders. This cache avoids rebuilding them from scratch every time. + /// + /// Defaults to `20`. + pub fn with_max_decoder_cache_size(mut self, n: usize) -> Self { + self.decoder_cache_size = n; + self + } + + fn validate(&self) -> Result<(), ArrowError> { + match ( + self.writer_schema_store.as_ref(), + self.reader_schema.as_ref(), + self.active_fingerprint.as_ref(), + ) { + (Some(_), None, _) => Err(ArrowError::ParseError( + "Reader schema must be set when writer schema store is provided".into(), + )), + (None, _, Some(_)) => Err(ArrowError::ParseError( + "Active fingerprint requires a writer schema store".into(), + )), + _ => Ok(()), Review Comment: Are we certain all other combos are valid? Perhaps better to enumerate the known-good and known-bad cases instead? Seems like we shouldn't even need a default match arm that that point. ########## arrow-avro/src/reader/mod.rs: ########## @@ -168,48 +167,58 @@ impl Decoder { /// /// Returns the number of bytes consumed. pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> { + if self.active_fingerprint.is_none() + && self.writer_schema_store.is_some() + && !data.starts_with(&SINGLE_OBJECT_MAGIC) + { + return Err(ArrowError::ParseError( + "Expected single‑object encoding fingerprint prefix for first message \ + (writer_schema_store is set but active_fingerprint is None)" + .into(), + )); + } let mut total_consumed = 0usize; - let hash_type = self.schema_store.as_ref().map_or( + let hash_type = self.writer_schema_store.as_ref().map_or( FingerprintAlgorithm::Rabin, SchemaStore::fingerprint_algorithm, ); - while total_consumed < data.len() && self.decoded_rows < self.batch_size { + while total_consumed < data.len() && self.remaining_capacity > 0 { if let Some(prefix_bytes) = self.handle_prefix(&data[total_consumed..], hash_type)? { - // Schema change detected (> 0) or there were insufficient bytes to read the next prefix (= 0). - // If the former, the batch must end because the next record has a different schema. - // If the latter, batch ends because the caller needs to fetch more bytes. + // A batch is complete when its `remaining_capacity` is 0. It may be completed early if + // a schema change is detected or there are insufficient bytes to read the next prefix. + // A schema change requires a new batch. Review Comment: Tho re-reading, comment seems to talk about both locations so it probably won't fit well in either one. Maybe it should be at L185 and explain the loop as a whole? ########## arrow-avro/src/codec.rs: ########## @@ -139,6 +139,22 @@ impl AvroField { pub fn name(&self) -> &str { &self.name } + + /// Performs schema resolution between a writer and reader schema. + /// + /// This is the primary entry point for handling schema evolution. It produces an + /// `AvroField` that contains all the necessary information to read data written + /// with the `writer` schema as if it were written with the `reader` schema. + pub fn resolve_from_writer_and_reader<'a>( + writer: &'a Schema<'a>, + reader: &'a Schema<'a>, Review Comment: Perhaps we could rename to disambiguate? `ArrowSchemaRef` vs. `[Avro]Schema`? ########## arrow-avro/src/schema.rs: ########## @@ -260,13 +274,369 @@ pub struct Fixed<'a> { pub attributes: Attributes<'a>, } +/// Supported fingerprint algorithms for Avro schema identification. +/// Currently only `Rabin` is supported, `SHA256` and `MD5` support will come in a future update +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum FingerprintAlgorithm { + /// 64‑bit CRC‑64‑AVRO Rabin fingerprint. + Rabin, +} + +/// A schema fingerprint in one of the supported formats. +/// +/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore` +/// instance always stores only one variant, matching its configured +/// `FingerprintAlgorithm`, but the enum makes the API uniform. +/// Currently only `Rabin` is supported +/// +/// <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints> +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum Fingerprint { + /// A 64-bit Rabin fingerprint. + Rabin(u64), +} + +/// Allow easy extraction of the algorithm used to create a fingerprint. +impl From<&Fingerprint> for FingerprintAlgorithm { + #[inline] + fn from(fp: &Fingerprint) -> Self { + match fp { + Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin, + } + } +} + +/// Generates a fingerprint for the given `Schema` using the specified `FingerprintAlgorithm`. +#[inline] +pub(crate) fn generate_fingerprint( + schema: &Schema, + hash_type: FingerprintAlgorithm, +) -> Result<Fingerprint, ArrowError> { + let canonical = generate_canonical_form(schema).map_err(|e| { + ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}")) + })?; + match hash_type { + FingerprintAlgorithm::Rabin => { + Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical))) + } + } +} + +/// Generates the 64-bit Rabin fingerprint for the given `Schema`. +/// +/// The fingerprint is computed from the canonical form of the schema. +/// This is also known as `CRC-64-AVRO`. +/// +/// # Returns +/// A `Fingerprint::Rabin` variant containing the 64-bit fingerprint. +#[inline] +pub fn generate_fingerprint_rabin(schema: &Schema) -> Result<Fingerprint, ArrowError> { + generate_fingerprint(schema, FingerprintAlgorithm::Rabin) +} + +/// Generates the Parsed Canonical Form for the given [`Schema`]. +/// +/// The canonical form is a standardized JSON representation of the schema, +/// primarily used for generating a schema fingerprint for equality checking. +/// +/// This form strips attributes that do not affect the schema's identity, +/// such as `doc` fields, `aliases`, and any properties not defined in the +/// Avro specification. +/// +/// <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas> +#[inline] +pub fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> { + build_canonical(schema, None) +} + +/// An in-memory cache of Avro schemas, indexed by their fingerprint. +/// +/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas efficiently. +/// Each schema is associated with a unique [`Fingerprint`], which is generated based +/// on the schema's canonical form and a specific hashing algorithm. +/// +/// A `SchemaStore` instance is configured to use a single [`FingerprintAlgorithm`] such as Rabin, +/// MD5 (not yet supported), or SHA256 (not yet supported) for all its operations. +/// This ensures consistency when generating fingerprints and looking up schemas. +/// All schemas registered will have their fingerprint computed with this algorithm, and +/// lookups must use a matching fingerprint. +/// +/// The lifetime parameter `'a` corresponds to the lifetime of the string slices +/// contained within the stored [`Schema`] objects. This means the `SchemaStore` +/// cannot outlive the data referenced by the schemas it contains. +/// +/// # Examples +/// +/// ```no_run +/// // Create a new store with the default Rabin fingerprinting. +/// use arrow_avro::schema::{PrimitiveType, Schema, SchemaStore, TypeName}; +/// +/// let mut store = SchemaStore::new(); +/// let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String)); +/// // Register the schema to get its fingerprint. +/// let fingerprint = store.register(schema.clone()).unwrap(); +/// // Use the fingerprint to look up the schema. +/// let retrieved_schema = store.lookup(&fingerprint).cloned(); +/// assert_eq!(retrieved_schema, Some(schema)); +/// ``` +#[derive(Debug, Clone)] +pub struct SchemaStore<'a> { + /// The hashing algorithm used for generating fingerprints. + fingerprint_algorithm: FingerprintAlgorithm, + /// A map from a schema's fingerprint to the schema itself. + schemas: HashMap<Fingerprint, Schema<'a>>, +} + +impl<'a> TryFrom<&'a [Schema<'a>]> for SchemaStore<'a> { + type Error = ArrowError; + + /// Creates a `SchemaStore` from a slice of schemas. + /// Each schema in the slice is registered with the new store. + fn try_from(schemas: &'a [Schema<'a>]) -> Result<Self, Self::Error> { + let mut store = SchemaStore::new(); + for schema in schemas { + store.register(schema.clone())?; + } + Ok(store) + } +} + +impl<'a> Default for SchemaStore<'a> { + fn default() -> Self { + Self { + fingerprint_algorithm: FingerprintAlgorithm::Rabin, + schemas: HashMap::new(), + } + } +} + +impl<'a> SchemaStore<'a> { + /// Creates an empty `SchemaStore` using the default fingerprinting algorithm (64-bit Rabin). + pub fn new() -> Self { + Self::default() + } + + /// Registers a schema with the store and returns its fingerprint. + /// + /// A fingerprint is calculated for the given schema using the store's configured + /// hash type. If a schema with the same fingerprint does not already exist in the + /// store, the new schema is inserted. If the fingerprint already exists, the + /// existing schema is not overwritten. + /// + /// # Arguments + /// + /// * `schema` - The schema to register. + /// + /// # Returns + /// + /// A `Result` containing the `Fingerprint` of the schema if successful, + /// or an `ArrowError` on failure. + pub fn register(&mut self, schema: Schema<'a>) -> Result<Fingerprint, ArrowError> { + let fp = generate_fingerprint(&schema, self.fingerprint_algorithm)?; + match self.schemas.entry(fp) { + Entry::Occupied(entry) => { + if entry.get() != &schema { + return Err(ArrowError::ComputeError(format!( + "Schema fingerprint collision detected for fingerprint {fp:?}" + ))); + } + } + Entry::Vacant(entry) => { + entry.insert(schema); + } + } + Ok(fp) + } + + /// Looks up a schema by its `Fingerprint`. + /// + /// # Arguments + /// + /// * `fp` - A reference to the `Fingerprint` of the schema to look up. + /// + /// # Returns + /// + /// An `Option` containing a clone of the `Schema` if found, otherwise `None`. + pub fn lookup(&self, fp: &Fingerprint) -> Option<&Schema<'a>> { + self.schemas.get(fp) + } + + /// Returns the `FingerprintAlgorithm` used by the `SchemaStore` for fingerprinting. + pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm { + self.fingerprint_algorithm + } +} + +#[inline] +fn quote(s: &str) -> Result<String, ArrowError> { + serde_json::to_string(s) + .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: {e}"))) +} + +fn make_fullname(name: &str, namespace_attr: Option<&str>, enclosing_ns: Option<&str>) -> String { + match namespace_attr.or(enclosing_ns) { + Some(ns) if !name.contains('.') => format!("{ns}.{name}"), + _ => name.to_string(), + } +} + +fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> Result<String, ArrowError> { + #[inline] + fn prim_str(pt: &PrimitiveType) -> &'static str { + match pt { + PrimitiveType::Null => "null", + PrimitiveType::Boolean => "boolean", + PrimitiveType::Int => "int", + PrimitiveType::Long => "long", + PrimitiveType::Float => "float", + PrimitiveType::Double => "double", + PrimitiveType::Bytes => "bytes", + PrimitiveType::String => "string", + } + } + Ok(match schema { + Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match tn { + TypeName::Primitive(pt) => quote(prim_str(pt))?, + TypeName::Ref(name) => quote(&make_fullname(name, None, enclosing_ns))?, + }, + Schema::Union(branches) => format!( + "[{}]", + branches + .iter() + .map(|b| build_canonical(b, enclosing_ns)) + .collect::<Result<Vec<_>, _>>()? + .join(",") + ), + Schema::Complex(ct) => match ct { + ComplexType::Record(r) => { + let fullname = make_fullname(r.name, r.namespace, enclosing_ns); + let child_ns = fullname.rsplit_once('.').map(|(ns, _)| ns.to_string()); Review Comment: This seems a rather indirect and complicated way to extract the child's namespace? Can we rework the helper function boundaries so we don't need string splits here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org