scovich commented on code in PR #8039:
URL: https://github.com/apache/arrow-rs/pull/8039#discussion_r2255018862


##########
arrow-avro/src/schema.rs:
##########
@@ -260,13 +276,388 @@ pub struct Fixed<'a> {
     pub attributes: Attributes<'a>,
 }
 
+/// Supported fingerprint algorithms for Avro schema identification.
+/// Currently only `Rabin` is supported, `SHA256` and `MD5` support will come 
in a future update
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
+pub enum FingerprintAlgorithm {
+    /// 64‑bit CRC‑64‑AVRO Rabin fingerprint.
+    #[default]
+    Rabin,
+}
+
+/// A schema fingerprint in one of the supported formats.
+///
+/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore`
+/// instance always stores only one variant, matching its configured
+/// `FingerprintAlgorithm`, but the enum makes the API uniform.
+/// Currently only `Rabin` is supported
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+pub enum Fingerprint {
+    /// A 64-bit Rabin fingerprint.
+    Rabin(u64),
+}
+
+/// Allow easy extraction of the algorithm used to create a fingerprint.
+impl From<&Fingerprint> for FingerprintAlgorithm {
+    fn from(fp: &Fingerprint) -> Self {
+        match fp {
+            Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin,
+        }
+    }
+}
+
+/// Generates a fingerprint for the given `Schema` using the specified 
`FingerprintAlgorithm`.
+pub(crate) fn generate_fingerprint(
+    schema: &Schema,
+    hash_type: FingerprintAlgorithm,
+) -> Result<Fingerprint, ArrowError> {
+    let canonical = generate_canonical_form(schema).map_err(|e| {
+        ArrowError::ComputeError(format!("Failed to generate canonical form 
for schema: {e}"))
+    })?;
+    match hash_type {
+        FingerprintAlgorithm::Rabin => {
+            Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical)))
+        }
+    }
+}
+
+/// Generates the 64-bit Rabin fingerprint for the given `Schema`.
+///
+/// The fingerprint is computed from the canonical form of the schema.
+/// This is also known as `CRC-64-AVRO`.
+///
+/// # Returns
+/// A `Fingerprint::Rabin` variant containing the 64-bit fingerprint.
+pub fn generate_fingerprint_rabin(schema: &Schema) -> Result<Fingerprint, 
ArrowError> {
+    generate_fingerprint(schema, FingerprintAlgorithm::Rabin)
+}
+
+/// Generates the Parsed Canonical Form for the given [`Schema`].
+///
+/// The canonical form is a standardized JSON representation of the schema,
+/// primarily used for generating a schema fingerprint for equality checking.
+///
+/// This form strips attributes that do not affect the schema's identity,
+/// such as `doc` fields, `aliases`, and any properties not defined in the
+/// Avro specification.
+///
+/// 
<https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
+pub fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> {
+    build_canonical(schema, None)
+}
+
+/// An in-memory cache of Avro schemas, indexed by their fingerprint.
+///
+/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas 
efficiently.
+/// Each schema is associated with a unique [`Fingerprint`], which is 
generated based
+/// on the schema's canonical form and a specific hashing algorithm.
+///
+/// A `SchemaStore` instance is configured to use a single 
[`FingerprintAlgorithm`] such as Rabin,
+/// MD5 (not yet supported), or SHA256 (not yet supported) for all its 
operations.
+/// This ensures consistency when generating fingerprints and looking up 
schemas.
+/// All schemas registered will have their fingerprint computed with this 
algorithm, and
+/// lookups must use a matching fingerprint.
+///
+/// The lifetime parameter `'a` corresponds to the lifetime of the string 
slices
+/// contained within the stored [`Schema`] objects. This means the 
`SchemaStore`
+/// cannot outlive the data referenced by the schemas it contains.
+///
+/// # Examples
+///
+/// ```no_run
+/// // Create a new store with the default Rabin fingerprinting.
+/// use arrow_avro::schema::{PrimitiveType, Schema, SchemaStore, TypeName};
+///
+/// let mut store = SchemaStore::new();
+/// let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
+/// // Register the schema to get its fingerprint.
+/// let fingerprint = store.register(schema.clone()).unwrap();
+/// // Use the fingerprint to look up the schema.
+/// let retrieved_schema = store.lookup(&fingerprint).cloned();
+/// assert_eq!(retrieved_schema, Some(schema));
+/// ```
+#[derive(Debug, Clone, Default)]
+pub struct SchemaStore<'a> {
+    /// The hashing algorithm used for generating fingerprints.
+    fingerprint_algorithm: FingerprintAlgorithm,
+    /// A map from a schema's fingerprint to the schema itself.
+    schemas: HashMap<Fingerprint, Schema<'a>>,
+}
+
+impl<'a> TryFrom<&'a [Schema<'a>]> for SchemaStore<'a> {
+    type Error = ArrowError;
+
+    /// Creates a `SchemaStore` from a slice of schemas.
+    /// Each schema in the slice is registered with the new store.
+    fn try_from(schemas: &'a [Schema<'a>]) -> Result<Self, Self::Error> {
+        let mut store = SchemaStore::new();
+        for schema in schemas {
+            store.register(schema.clone())?;
+        }
+        Ok(store)
+    }
+}
+
+impl<'a> SchemaStore<'a> {
+    /// Creates an empty `SchemaStore` using the default fingerprinting 
algorithm (64-bit Rabin).
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Registers a schema with the store and returns its fingerprint.
+    ///
+    /// A fingerprint is calculated for the given schema using the store's 
configured
+    /// hash type. If a schema with the same fingerprint does not already 
exist in the
+    /// store, the new schema is inserted. If the fingerprint already exists, 
the
+    /// existing schema is not overwritten.
+    ///
+    /// # Arguments
+    ///
+    /// * `schema` - The schema to register.
+    ///
+    /// # Returns
+    ///
+    /// A `Result` containing the `Fingerprint` of the schema if successful,
+    /// or an `ArrowError` on failure.
+    pub fn register(&mut self, schema: Schema<'a>) -> Result<Fingerprint, 
ArrowError> {
+        let fingerprint = generate_fingerprint(&schema, 
self.fingerprint_algorithm)?;
+        match self.schemas.entry(fingerprint) {
+            Entry::Occupied(entry) => {
+                if entry.get() != &schema {
+                    return Err(ArrowError::ComputeError(format!(
+                        "Schema fingerprint collision detected for fingerprint 
{fingerprint:?}"
+                    )));
+                }
+            }
+            Entry::Vacant(entry) => {
+                entry.insert(schema);
+            }
+        }
+        Ok(fingerprint)
+    }
+
+    /// Looks up a schema by its `Fingerprint`.
+    ///
+    /// # Arguments
+    ///
+    /// * `fingerprint` - A reference to the `Fingerprint` of the schema to 
look up.
+    ///
+    /// # Returns
+    ///
+    /// An `Option` containing a clone of the `Schema` if found, otherwise 
`None`.
+    pub fn lookup(&self, fingerprint: &Fingerprint) -> Option<&Schema<'a>> {
+        self.schemas.get(fingerprint)
+    }
+
+    /// Returns a `Vec` containing **all unique [`Fingerprint`]s** currently
+    /// held by this [`SchemaStore`].
+    ///
+    /// The order of the returned fingerprints is unspecified and should not be
+    /// relied upon.
+    pub fn fingerprints(&self) -> Vec<Fingerprint> {
+        self.schemas.keys().copied().collect()
+    }
+
+    /// Returns the `FingerprintAlgorithm` used by the `SchemaStore` for 
fingerprinting.
+    pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm {
+        self.fingerprint_algorithm
+    }
+}
+
+fn quote(s: &str) -> Result<String, ArrowError> {
+    serde_json::to_string(s)
+        .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: 
{e}")))
+}
+
+// Avro names are defined by a `name` and an optional `namespace`.
+// The full name is composed of the namespace and the name, separated by a dot.
+//
+// Avro specification defines two ways to specify a full name:
+// 1. The `name` attribute contains the full name (e.g., "a.b.c.d").
+//    In this case, the `namespace` attribute is ignored.
+// 2. The `name` attribute contains the simple name (e.g., "d") and the
+//    `namespace` attribute contains the namespace (e.g., "a.b.c").
+//
+// Each part of the name must match the regex `^[A-Za-z_][A-Za-z0-9_]*$`.
+// Complex paths with quotes or backticks like `a."hi".b` are not supported.
+//
+// This function constructs the full name and extracts the namespace,
+// handling both ways of specifying the name. It prioritizes a namespace
+// defined within the `name` attribute itself, then the explicit 
`namespace_attr`,
+// and finally the `enclosing_ns`.
+fn make_full_name(
+    name: &str,
+    namespace_attr: Option<&str>,
+    enclosing_ns: Option<&str>,
+) -> (String, Option<String>) {
+    // `name` already contains a dot then treat as full-name, ignore namespace.
+    if let Some((ns, _)) = name.rsplit_once('.') {
+        return (name.to_string(), Some(ns.to_string()));
+    }
+    match namespace_attr.or(enclosing_ns) {
+        Some(ns) => (format!("{ns}.{name}"), Some(ns.to_string())),
+        None => (name.to_string(), None),
+    }
+}
+
+fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> 
Result<String, ArrowError> {
+    Ok(match schema {
+        Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match 
tn {
+            TypeName::Primitive(pt) => quote(pt.as_ref())?,
+            TypeName::Ref(name) => {
+                let (full_name, _) = make_full_name(name, None, enclosing_ns);
+                quote(&full_name)?
+            }
+        },
+        Schema::Union(branches) => format!(
+            "[{}]",
+            branches
+                .iter()
+                .map(|b| build_canonical(b, enclosing_ns))
+                .collect::<Result<Vec<_>, _>>()?
+                .join(",")
+        ),
+        Schema::Complex(ct) => match ct {
+            ComplexType::Record(r) => {
+                let (full_name, child_ns) = make_full_name(r.name, 
r.namespace, enclosing_ns);
+                let fields = r
+                    .fields
+                    .iter()
+                    .map(|f| {
+                        let field_type =
+                            build_canonical(&f.r#type, 
child_ns.as_deref().or(enclosing_ns))?;
+                        Ok(format!(
+                            r#"{{"name":{},"type":{}}}"#,

Review Comment:
   Makes sense, yup!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to