scovich commented on code in PR #8039: URL: https://github.com/apache/arrow-rs/pull/8039#discussion_r2255018862
########## arrow-avro/src/schema.rs: ########## @@ -260,13 +276,388 @@ pub struct Fixed<'a> { pub attributes: Attributes<'a>, } +/// Supported fingerprint algorithms for Avro schema identification. +/// Currently only `Rabin` is supported, `SHA256` and `MD5` support will come in a future update +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)] +pub enum FingerprintAlgorithm { + /// 64‑bit CRC‑64‑AVRO Rabin fingerprint. + #[default] + Rabin, +} + +/// A schema fingerprint in one of the supported formats. +/// +/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore` +/// instance always stores only one variant, matching its configured +/// `FingerprintAlgorithm`, but the enum makes the API uniform. +/// Currently only `Rabin` is supported +/// +/// <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints> +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum Fingerprint { + /// A 64-bit Rabin fingerprint. + Rabin(u64), +} + +/// Allow easy extraction of the algorithm used to create a fingerprint. +impl From<&Fingerprint> for FingerprintAlgorithm { + fn from(fp: &Fingerprint) -> Self { + match fp { + Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin, + } + } +} + +/// Generates a fingerprint for the given `Schema` using the specified `FingerprintAlgorithm`. +pub(crate) fn generate_fingerprint( + schema: &Schema, + hash_type: FingerprintAlgorithm, +) -> Result<Fingerprint, ArrowError> { + let canonical = generate_canonical_form(schema).map_err(|e| { + ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}")) + })?; + match hash_type { + FingerprintAlgorithm::Rabin => { + Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical))) + } + } +} + +/// Generates the 64-bit Rabin fingerprint for the given `Schema`. +/// +/// The fingerprint is computed from the canonical form of the schema. +/// This is also known as `CRC-64-AVRO`. +/// +/// # Returns +/// A `Fingerprint::Rabin` variant containing the 64-bit fingerprint. +pub fn generate_fingerprint_rabin(schema: &Schema) -> Result<Fingerprint, ArrowError> { + generate_fingerprint(schema, FingerprintAlgorithm::Rabin) +} + +/// Generates the Parsed Canonical Form for the given [`Schema`]. +/// +/// The canonical form is a standardized JSON representation of the schema, +/// primarily used for generating a schema fingerprint for equality checking. +/// +/// This form strips attributes that do not affect the schema's identity, +/// such as `doc` fields, `aliases`, and any properties not defined in the +/// Avro specification. +/// +/// <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas> +pub fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> { + build_canonical(schema, None) +} + +/// An in-memory cache of Avro schemas, indexed by their fingerprint. +/// +/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas efficiently. +/// Each schema is associated with a unique [`Fingerprint`], which is generated based +/// on the schema's canonical form and a specific hashing algorithm. +/// +/// A `SchemaStore` instance is configured to use a single [`FingerprintAlgorithm`] such as Rabin, +/// MD5 (not yet supported), or SHA256 (not yet supported) for all its operations. +/// This ensures consistency when generating fingerprints and looking up schemas. +/// All schemas registered will have their fingerprint computed with this algorithm, and +/// lookups must use a matching fingerprint. +/// +/// The lifetime parameter `'a` corresponds to the lifetime of the string slices +/// contained within the stored [`Schema`] objects. This means the `SchemaStore` +/// cannot outlive the data referenced by the schemas it contains. +/// +/// # Examples +/// +/// ```no_run +/// // Create a new store with the default Rabin fingerprinting. +/// use arrow_avro::schema::{PrimitiveType, Schema, SchemaStore, TypeName}; +/// +/// let mut store = SchemaStore::new(); +/// let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String)); +/// // Register the schema to get its fingerprint. +/// let fingerprint = store.register(schema.clone()).unwrap(); +/// // Use the fingerprint to look up the schema. +/// let retrieved_schema = store.lookup(&fingerprint).cloned(); +/// assert_eq!(retrieved_schema, Some(schema)); +/// ``` +#[derive(Debug, Clone, Default)] +pub struct SchemaStore<'a> { + /// The hashing algorithm used for generating fingerprints. + fingerprint_algorithm: FingerprintAlgorithm, + /// A map from a schema's fingerprint to the schema itself. + schemas: HashMap<Fingerprint, Schema<'a>>, +} + +impl<'a> TryFrom<&'a [Schema<'a>]> for SchemaStore<'a> { + type Error = ArrowError; + + /// Creates a `SchemaStore` from a slice of schemas. + /// Each schema in the slice is registered with the new store. + fn try_from(schemas: &'a [Schema<'a>]) -> Result<Self, Self::Error> { + let mut store = SchemaStore::new(); + for schema in schemas { + store.register(schema.clone())?; + } + Ok(store) + } +} + +impl<'a> SchemaStore<'a> { + /// Creates an empty `SchemaStore` using the default fingerprinting algorithm (64-bit Rabin). + pub fn new() -> Self { + Self::default() + } + + /// Registers a schema with the store and returns its fingerprint. + /// + /// A fingerprint is calculated for the given schema using the store's configured + /// hash type. If a schema with the same fingerprint does not already exist in the + /// store, the new schema is inserted. If the fingerprint already exists, the + /// existing schema is not overwritten. + /// + /// # Arguments + /// + /// * `schema` - The schema to register. + /// + /// # Returns + /// + /// A `Result` containing the `Fingerprint` of the schema if successful, + /// or an `ArrowError` on failure. + pub fn register(&mut self, schema: Schema<'a>) -> Result<Fingerprint, ArrowError> { + let fingerprint = generate_fingerprint(&schema, self.fingerprint_algorithm)?; + match self.schemas.entry(fingerprint) { + Entry::Occupied(entry) => { + if entry.get() != &schema { + return Err(ArrowError::ComputeError(format!( + "Schema fingerprint collision detected for fingerprint {fingerprint:?}" + ))); + } + } + Entry::Vacant(entry) => { + entry.insert(schema); + } + } + Ok(fingerprint) + } + + /// Looks up a schema by its `Fingerprint`. + /// + /// # Arguments + /// + /// * `fingerprint` - A reference to the `Fingerprint` of the schema to look up. + /// + /// # Returns + /// + /// An `Option` containing a clone of the `Schema` if found, otherwise `None`. + pub fn lookup(&self, fingerprint: &Fingerprint) -> Option<&Schema<'a>> { + self.schemas.get(fingerprint) + } + + /// Returns a `Vec` containing **all unique [`Fingerprint`]s** currently + /// held by this [`SchemaStore`]. + /// + /// The order of the returned fingerprints is unspecified and should not be + /// relied upon. + pub fn fingerprints(&self) -> Vec<Fingerprint> { + self.schemas.keys().copied().collect() + } + + /// Returns the `FingerprintAlgorithm` used by the `SchemaStore` for fingerprinting. + pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm { + self.fingerprint_algorithm + } +} + +fn quote(s: &str) -> Result<String, ArrowError> { + serde_json::to_string(s) + .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: {e}"))) +} + +// Avro names are defined by a `name` and an optional `namespace`. +// The full name is composed of the namespace and the name, separated by a dot. +// +// Avro specification defines two ways to specify a full name: +// 1. The `name` attribute contains the full name (e.g., "a.b.c.d"). +// In this case, the `namespace` attribute is ignored. +// 2. The `name` attribute contains the simple name (e.g., "d") and the +// `namespace` attribute contains the namespace (e.g., "a.b.c"). +// +// Each part of the name must match the regex `^[A-Za-z_][A-Za-z0-9_]*$`. +// Complex paths with quotes or backticks like `a."hi".b` are not supported. +// +// This function constructs the full name and extracts the namespace, +// handling both ways of specifying the name. It prioritizes a namespace +// defined within the `name` attribute itself, then the explicit `namespace_attr`, +// and finally the `enclosing_ns`. +fn make_full_name( + name: &str, + namespace_attr: Option<&str>, + enclosing_ns: Option<&str>, +) -> (String, Option<String>) { + // `name` already contains a dot then treat as full-name, ignore namespace. + if let Some((ns, _)) = name.rsplit_once('.') { + return (name.to_string(), Some(ns.to_string())); + } + match namespace_attr.or(enclosing_ns) { + Some(ns) => (format!("{ns}.{name}"), Some(ns.to_string())), + None => (name.to_string(), None), + } +} + +fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> Result<String, ArrowError> { + Ok(match schema { + Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match tn { + TypeName::Primitive(pt) => quote(pt.as_ref())?, + TypeName::Ref(name) => { + let (full_name, _) = make_full_name(name, None, enclosing_ns); + quote(&full_name)? + } + }, + Schema::Union(branches) => format!( + "[{}]", + branches + .iter() + .map(|b| build_canonical(b, enclosing_ns)) + .collect::<Result<Vec<_>, _>>()? + .join(",") + ), + Schema::Complex(ct) => match ct { + ComplexType::Record(r) => { + let (full_name, child_ns) = make_full_name(r.name, r.namespace, enclosing_ns); + let fields = r + .fields + .iter() + .map(|f| { + let field_type = + build_canonical(&f.r#type, child_ns.as_deref().or(enclosing_ns))?; + Ok(format!( + r#"{{"name":{},"type":{}}}"#, Review Comment: Makes sense, yup! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org