This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new f27feb8  refactor: define avro model for parsing commit metadata (#477)
f27feb8 is described below

commit f27feb8ceed468063e5eec716a768b2f086977df
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Oct 28 21:19:56 2025 -0700

    refactor: define avro model for parsing commit metadata (#477)
---
 Cargo.toml                                 |   3 +-
 crates/core/Cargo.toml                     |   1 +
 crates/core/src/file_group/builder.rs      | 178 ++++++------
 crates/core/src/metadata/commit.rs         | 435 +++++++++++++++++++++++++++++
 crates/core/src/metadata/mod.rs            |   2 +
 crates/core/src/metadata/replace_commit.rs | 130 +++++++++
 crates/core/src/schema/resolver.rs         | 160 ++++++++---
 7 files changed, 779 insertions(+), 130 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 595aeda..bc991d1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -51,7 +51,8 @@ object_store = { version = "~0.11.2", features = ["aws", 
"azure", "gcp"] }
 parquet = { version = "~54.2.0", features = ["async", "object_store"] }
 
 # avro
-apache-avro = { version = "~0.17.0" }
+apache-avro = { version = "~0.17.0", features = ["derive"] }
+apache-avro-derive = { version = "~0.17.0" }
 
 # datafusion
 datafusion = { version = "~46.0.0" }
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index 7cd88af..e8bde12 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -45,6 +45,7 @@ parquet = { workspace = true }
 
 # avro
 apache-avro = { workspace = true }
+apache-avro-derive = { workspace = true }
 
 # serde
 serde = { workspace = true }
diff --git a/crates/core/src/file_group/builder.rs 
b/crates/core/src/file_group/builder.rs
index 9f2d0c3..bbc767c 100644
--- a/crates/core/src/file_group/builder.rs
+++ b/crates/core/src/file_group/builder.rs
@@ -18,6 +18,8 @@
  */
 use crate::error::CoreError;
 use crate::file_group::FileGroup;
+use crate::metadata::commit::HoodieCommitMetadata;
+use crate::metadata::replace_commit::HoodieReplaceCommitMetadata;
 use crate::Result;
 use serde_json::{Map, Value};
 use std::collections::HashSet;
@@ -47,64 +49,48 @@ impl FileGroupMerger for HashSet<FileGroup> {
 }
 
 pub fn build_file_groups(commit_metadata: &Map<String, Value>) -> 
Result<HashSet<FileGroup>> {
-    let partition_stats = commit_metadata
-        .get("partitionToWriteStats")
-        .and_then(|v| v.as_object())
-        .ok_or_else(|| {
-            CoreError::CommitMetadata("Invalid or missing 
partitionToWriteStats object".into())
-        })?;
+    let metadata = HoodieCommitMetadata::from_json_map(commit_metadata)?;
 
     let mut file_groups = HashSet::new();
 
-    for (partition, write_stats_array) in partition_stats {
-        let write_stats = write_stats_array
-            .as_array()
-            .ok_or_else(|| CoreError::CommitMetadata("Invalid write stats 
array".into()))?;
-
-        for stat in write_stats {
-            let file_id = stat
-                .get("fileId")
-                .and_then(|v| v.as_str())
-                .ok_or_else(|| CoreError::CommitMetadata("Invalid fileId in 
write stats".into()))?;
-
-            let mut file_group = FileGroup::new(file_id.to_string(), 
partition.clone());
-
-            if let Some(base_file_name) = stat.get("baseFile") {
-                let base_file_name = base_file_name
-                    .as_str()
-                    .ok_or_else(|| CoreError::CommitMetadata("Invalid base 
file name".into()))?;
-                file_group.add_base_file_from_name(base_file_name)?;
-
-                if let Some(log_file_names) = stat.get("logFiles") {
-                    let log_file_names = 
log_file_names.as_array().ok_or_else(|| {
-                        CoreError::CommitMetadata("Invalid log files 
array".into())
-                    })?;
-                    for log_file_name in log_file_names {
-                        let log_file_name = 
log_file_name.as_str().ok_or_else(|| {
-                            CoreError::CommitMetadata("Invalid log file 
name".into())
-                        })?;
-                        file_group.add_log_file_from_name(log_file_name)?;
-                    }
-                } else {
-                    return Err(CoreError::CommitMetadata(
-                        "Missing log files in write stats".into(),
-                    ));
-                }
-            } else {
-                let path = stat.get("path").and_then(|v| 
v.as_str()).ok_or_else(|| {
-                    CoreError::CommitMetadata("Invalid path in write 
stats".into())
-                })?;
+    for (partition, write_stat) in metadata.iter_write_stats() {
+        let file_id = write_stat
+            .file_id
+            .as_ref()
+            .ok_or_else(|| CoreError::CommitMetadata("Missing fileId in write 
stats".into()))?;
 
-                let file_name = Path::new(path)
-                    .file_name()
-                    .and_then(|name| name.to_str())
-                    .ok_or_else(|| CoreError::CommitMetadata("Invalid file 
name in path".into()))?;
+        let mut file_group = FileGroup::new(file_id.clone(), 
partition.clone());
 
-                file_group.add_base_file_from_name(file_name)?;
-            }
+        // Handle two cases:
+        // 1. MOR table with baseFile and logFiles
+        // 2. COW table with path only
+        if let Some(base_file_name) = &write_stat.base_file {
+            file_group.add_base_file_from_name(base_file_name)?;
 
-            file_groups.insert(file_group);
+            if let Some(log_file_names) = &write_stat.log_files {
+                for log_file_name in log_file_names {
+                    file_group.add_log_file_from_name(log_file_name)?;
+                }
+            } else {
+                return Err(CoreError::CommitMetadata(
+                    "Missing log files in write stats".into(),
+                ));
+            }
+        } else {
+            let path = write_stat
+                .path
+                .as_ref()
+                .ok_or_else(|| CoreError::CommitMetadata("Missing path in 
write stats".into()))?;
+
+            let file_name = Path::new(path)
+                .file_name()
+                .and_then(|name| name.to_str())
+                .ok_or_else(|| CoreError::CommitMetadata("Invalid file name in 
path".into()))?;
+
+            file_group.add_base_file_from_name(file_name)?;
         }
+
+        file_groups.insert(file_group);
     }
 
     Ok(file_groups)
@@ -113,28 +99,14 @@ pub fn build_file_groups(commit_metadata: &Map<String, 
Value>) -> Result<HashSet
 pub fn build_replaced_file_groups(
     commit_metadata: &Map<String, Value>,
 ) -> Result<HashSet<FileGroup>> {
-    let partition_to_replaced = commit_metadata
-        .get("partitionToReplaceFileIds")
-        .and_then(|v| v.as_object())
-        .ok_or_else(|| {
-            CoreError::CommitMetadata("Invalid or missing 
partitionToReplaceFileIds object".into())
-        })?;
+    // Replace commits follow HoodieReplaceCommitMetadata schema; parse with 
the dedicated type
+    let metadata = 
HoodieReplaceCommitMetadata::from_json_map(commit_metadata)?;
 
     let mut file_groups = HashSet::new();
 
-    for (partition, file_ids_value) in partition_to_replaced {
-        let file_ids = file_ids_value
-            .as_array()
-            .ok_or_else(|| CoreError::CommitMetadata("Invalid file group ids 
array".into()))?;
-
-        for file_id in file_ids {
-            let id = file_id
-                .as_str()
-                .ok_or_else(|| CoreError::CommitMetadata("Invalid file group 
id string".into()))?;
-
-            let file_group = FileGroup::new(id.to_string(), partition.clone());
-            file_groups.insert(file_group);
-        }
+    for (partition, file_id) in metadata.iter_replace_file_ids() {
+        let file_group = FileGroup::new(file_id.clone(), partition.clone());
+        file_groups.insert(file_group);
     }
 
     Ok(file_groups)
@@ -143,6 +115,37 @@ pub fn build_replaced_file_groups(
 #[cfg(test)]
 mod tests {
 
+    mod test_file_group_merger {
+        use super::super::*;
+        use crate::file_group::FileGroup;
+
+        #[test]
+        fn test_merge_file_groups() {
+            let mut existing = HashSet::new();
+            let fg1 = FileGroup::new("file1".to_string(), "p1".to_string());
+            existing.insert(fg1);
+
+            let new_groups = vec![
+                FileGroup::new("file2".to_string(), "p1".to_string()),
+                FileGroup::new("file3".to_string(), "p2".to_string()),
+            ];
+
+            existing.merge(new_groups).unwrap();
+            assert_eq!(existing.len(), 3);
+        }
+
+        #[test]
+        fn test_merge_empty() {
+            let mut existing = HashSet::new();
+            let fg1 = FileGroup::new("file1".to_string(), "p1".to_string());
+            existing.insert(fg1);
+
+            let new_groups: Vec<FileGroup> = vec![];
+            existing.merge(new_groups).unwrap();
+            assert_eq!(existing.len(), 1);
+        }
+    }
+
     mod test_build_file_groups {
         use super::super::*;
         use serde_json::{json, Map, Value};
@@ -158,9 +161,10 @@ mod tests {
             .clone();
 
             let result = build_file_groups(&metadata);
-            assert!(matches!(result,
-                Err(CoreError::CommitMetadata(msg))
-                if msg == "Invalid or missing partitionToWriteStats object"));
+            // With the new implementation, this returns Ok with an empty 
HashSet
+            // because iter_write_stats() returns an empty iterator when 
partition_to_write_stats is None
+            assert!(result.is_ok());
+            assert_eq!(result.unwrap().len(), 0);
         }
 
         #[test]
@@ -177,7 +181,7 @@ mod tests {
             let result = build_file_groups(&metadata);
             assert!(matches!(
                 result,
-                Err(CoreError::CommitMetadata(msg)) if msg == "Invalid write 
stats array"
+                Err(CoreError::CommitMetadata(msg)) if msg.contains("Failed to 
parse commit metadata")
             ));
         }
 
@@ -197,7 +201,7 @@ mod tests {
             let result = build_file_groups(&metadata);
             assert!(matches!(
                 result,
-                Err(CoreError::CommitMetadata(msg)) if msg == "Invalid fileId 
in write stats"
+                Err(CoreError::CommitMetadata(msg)) if msg == "Missing fileId 
in write stats"
             ));
         }
 
@@ -217,7 +221,7 @@ mod tests {
             let result = build_file_groups(&metadata);
             assert!(matches!(
                 result,
-                Err(CoreError::CommitMetadata(msg)) if msg == "Invalid path in 
write stats"
+                Err(CoreError::CommitMetadata(msg)) if msg == "Missing path in 
write stats"
             ));
         }
 
@@ -257,9 +261,10 @@ mod tests {
             .clone();
 
             let result = build_file_groups(&metadata);
+            // Serde will fail to parse this and return a deserialization error
             assert!(matches!(
                 result,
-                Err(CoreError::CommitMetadata(msg)) if msg == "Invalid fileId 
in write stats"
+                Err(CoreError::CommitMetadata(msg)) if msg.contains("Failed to 
parse commit metadata")
             ));
         }
 
@@ -278,9 +283,10 @@ mod tests {
             .clone();
 
             let result = build_file_groups(&metadata);
+            // Serde will fail to parse this and return a deserialization error
             assert!(matches!(
                 result,
-                Err(CoreError::CommitMetadata(msg)) if msg == "Invalid path in 
write stats"
+                Err(CoreError::CommitMetadata(msg)) if msg.contains("Failed to 
parse commit metadata")
             ));
         }
 
@@ -331,10 +337,10 @@ mod tests {
             .clone();
 
             let result = build_replaced_file_groups(&metadata);
-            assert!(matches!(
-                result,
-                Err(CoreError::CommitMetadata(msg)) if msg == "Invalid or 
missing partitionToReplaceFileIds object"
-            ));
+            // With the new implementation, this returns Ok with an empty 
HashSet
+            // because iter_replace_file_ids() returns an empty iterator when 
partition_to_replace_file_ids is None
+            assert!(result.is_ok());
+            assert_eq!(result.unwrap().len(), 0);
         }
 
         #[test]
@@ -351,7 +357,7 @@ mod tests {
             let result = build_replaced_file_groups(&metadata);
             assert!(matches!(
                 result,
-                Err(CoreError::CommitMetadata(msg)) if msg == "Invalid file 
group ids array"
+                Err(CoreError::CommitMetadata(msg)) if msg.contains("Failed to 
parse commit metadata")
             ));
         }
 
@@ -367,9 +373,10 @@ mod tests {
             .clone();
 
             let result = build_replaced_file_groups(&metadata);
+            // Serde will fail to parse this
             assert!(matches!(
                 result,
-                Err(CoreError::CommitMetadata(msg)) if msg == "Invalid file 
group id string"
+                Err(CoreError::CommitMetadata(msg)) if msg.contains("Failed to 
parse commit metadata")
             ));
         }
 
@@ -385,9 +392,10 @@ mod tests {
             .clone();
 
             let result = build_replaced_file_groups(&metadata);
+            // Serde will fail to parse this
             assert!(matches!(
                 result,
-                Err(CoreError::CommitMetadata(msg)) if msg == "Invalid file 
group id string"
+                Err(CoreError::CommitMetadata(msg)) if msg.contains("Failed to 
parse commit metadata")
             ));
         }
 
diff --git a/crates/core/src/metadata/commit.rs 
b/crates/core/src/metadata/commit.rs
new file mode 100644
index 0000000..579874b
--- /dev/null
+++ b/crates/core/src/metadata/commit.rs
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::error::CoreError;
+use crate::Result;
+use apache_avro_derive::AvroSchema as DeriveAvroSchema;
+use serde::{Deserialize, Serialize};
+use serde_json::{Map, Value};
+use std::collections::HashMap;
+
+/// Represents statistics for a single file write operation in a commit
+///
+/// This struct is automatically derived to/from Avro schema using 
apache-avro-derive.
+/// The Avro schema can be accessed via `HoodieWriteStat::get_schema()`.
+#[derive(Debug, Clone, Serialize, Deserialize, DeriveAvroSchema)]
+#[serde(rename_all = "camelCase")]
+#[avro(namespace = "org.apache.hudi.avro.model")]
+pub struct HoodieWriteStat {
+    #[avro(rename = "fileId")]
+    pub file_id: Option<String>,
+    pub path: Option<String>,
+    #[avro(rename = "baseFile")]
+    pub base_file: Option<String>,
+    #[avro(rename = "logFiles")]
+    pub log_files: Option<Vec<String>>,
+    #[avro(rename = "prevCommit")]
+    pub prev_commit: Option<String>,
+    #[avro(rename = "numWrites")]
+    pub num_writes: Option<i64>,
+    #[avro(rename = "numDeletes")]
+    pub num_deletes: Option<i64>,
+    #[avro(rename = "numUpdateWrites")]
+    pub num_update_writes: Option<i64>,
+    #[avro(rename = "numInserts")]
+    pub num_inserts: Option<i64>,
+    #[avro(rename = "totalWriteBytes")]
+    pub total_write_bytes: Option<i64>,
+    #[avro(rename = "totalWriteErrors")]
+    pub total_write_errors: Option<i64>,
+}
+
+/// Represents the metadata for a Hudi commit
+///
+/// This struct is automatically derived to/from Avro schema using 
apache-avro-derive.
+/// The Avro schema can be accessed via `HoodieCommitMetadata::get_schema()`.
+///
+/// # Example
+/// ```
+/// use hudi_core::metadata::commit::HoodieCommitMetadata;
+///
+/// // Get the Avro schema
+/// let schema = HoodieCommitMetadata::get_schema();
+/// println!("Schema: {}", schema.canonical_form());
+/// ```
+#[derive(Debug, Clone, Serialize, Deserialize, DeriveAvroSchema)]
+#[serde(rename_all = "camelCase")]
+#[avro(namespace = "org.apache.hudi.avro.model")]
+pub struct HoodieCommitMetadata {
+    pub version: Option<i32>,
+    #[avro(rename = "operationType")]
+    pub operation_type: Option<String>,
+    #[avro(rename = "partitionToWriteStats")]
+    pub partition_to_write_stats: Option<HashMap<String, 
Vec<HoodieWriteStat>>>,
+    #[avro(rename = "partitionToReplaceFileIds")]
+    pub partition_to_replace_file_ids: Option<HashMap<String, Vec<String>>>,
+    pub compacted: Option<bool>,
+    #[avro(rename = "extraMetadata")]
+    pub extra_metadata: Option<HashMap<String, String>>,
+}
+
+impl HoodieCommitMetadata {
+    /// Parse commit metadata from a serde_json Map
+    pub fn from_json_map(map: &Map<String, Value>) -> Result<Self> {
+        serde_json::from_value(Value::Object(map.clone())).map_err(|e| {
+            CoreError::CommitMetadata(format!("Failed to parse commit 
metadata: {}", e))
+        })
+    }
+
+    /// Parse commit metadata from JSON bytes
+    pub fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
+        serde_json::from_slice(bytes).map_err(|e| {
+            CoreError::CommitMetadata(format!("Failed to parse commit 
metadata: {}", e))
+        })
+    }
+
+    /// Get the write stats for a specific partition
+    pub fn get_partition_write_stats(&self, partition: &str) -> 
Option<&Vec<HoodieWriteStat>> {
+        self.partition_to_write_stats
+            .as_ref()
+            .and_then(|stats| stats.get(partition))
+    }
+
+    /// Get all partitions with write stats
+    pub fn get_partitions_with_writes(&self) -> Vec<String> {
+        self.partition_to_write_stats
+            .as_ref()
+            .map(|stats| stats.keys().cloned().collect())
+            .unwrap_or_default()
+    }
+
+    /// Get the file IDs to be replaced for a specific partition
+    pub fn get_partition_replace_file_ids(&self, partition: &str) -> 
Option<&Vec<String>> {
+        self.partition_to_replace_file_ids
+            .as_ref()
+            .and_then(|ids| ids.get(partition))
+    }
+
+    /// Get all partitions with file replacements
+    pub fn get_partitions_with_replacements(&self) -> Vec<String> {
+        self.partition_to_replace_file_ids
+            .as_ref()
+            .map(|ids| ids.keys().cloned().collect())
+            .unwrap_or_default()
+    }
+
+    /// Iterate over all write stats across all partitions
+    pub fn iter_write_stats(&self) -> impl Iterator<Item = (&String, 
&HoodieWriteStat)> {
+        self.partition_to_write_stats
+            .as_ref()
+            .into_iter()
+            .flat_map(|stats| {
+                stats.iter().flat_map(|(partition, write_stats)| {
+                    write_stats.iter().map(move |stat| (partition, stat))
+                })
+            })
+    }
+
+    /// Iterate over all replace file IDs across all partitions
+    pub fn iter_replace_file_ids(&self) -> impl Iterator<Item = (&String, 
&String)> {
+        self.partition_to_replace_file_ids
+            .as_ref()
+            .into_iter()
+            .flat_map(|replace_ids| {
+                replace_ids.iter().flat_map(|(partition, file_ids)| {
+                    file_ids.iter().map(move |file_id| (partition, file_id))
+                })
+            })
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use apache_avro::{AvroSchema, Schema};
+    use serde_json::json;
+
+    #[test]
+    fn test_parse_commit_metadata() {
+        let json = json!({
+            "version": 1,
+            "operationType": "UPSERT",
+            "partitionToWriteStats": {
+                "byteField=20/shortField=100": [{
+                    "fileId": "bb7c3a45-387f-490d-aab2-981c3f1a8ada-0",
+                    "path": 
"byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
+                    "numWrites": 100,
+                    "totalWriteBytes": 1024
+                }]
+            },
+            "compacted": false
+        });
+
+        let metadata: HoodieCommitMetadata = 
serde_json::from_value(json).unwrap();
+        assert_eq!(metadata.version, Some(1));
+        assert_eq!(metadata.operation_type, Some("UPSERT".to_string()));
+        assert!(metadata.partition_to_write_stats.is_some());
+
+        let stats = metadata
+            .get_partition_write_stats("byteField=20/shortField=100")
+            .unwrap();
+        assert_eq!(stats.len(), 1);
+        assert_eq!(
+            stats[0].file_id,
+            Some("bb7c3a45-387f-490d-aab2-981c3f1a8ada-0".to_string())
+        );
+    }
+
+    #[test]
+    fn test_parse_replace_file_ids() {
+        let json = json!({
+            "partitionToReplaceFileIds": {
+                "30": ["d398fae1-c0e6-4098-8124-f55f7098bdba-0"],
+                "20": ["88163884-fef0-4aab-865d-c72327a8a1d5-0"]
+            }
+        });
+
+        let metadata: HoodieCommitMetadata = 
serde_json::from_value(json).unwrap();
+
+        let file_ids = metadata.get_partition_replace_file_ids("30").unwrap();
+        assert_eq!(file_ids.len(), 1);
+        assert_eq!(file_ids[0], "d398fae1-c0e6-4098-8124-f55f7098bdba-0");
+    }
+
+    #[test]
+    fn test_iter_write_stats() {
+        let json = json!({
+            "partitionToWriteStats": {
+                "p1": [{
+                    "fileId": "file1",
+                    "path": "p1/file1.parquet"
+                }],
+                "p2": [{
+                    "fileId": "file2",
+                    "path": "p2/file2.parquet"
+                }]
+            }
+        });
+
+        let metadata: HoodieCommitMetadata = 
serde_json::from_value(json).unwrap();
+        let count = metadata.iter_write_stats().count();
+        assert_eq!(count, 2);
+    }
+
+    #[test]
+    fn test_avro_schema_generation() {
+        // Test that the derived Avro schema can be generated
+        let schema = HoodieCommitMetadata::get_schema();
+
+        // Verify it's a record type
+        if let Schema::Record(ref record) = schema {
+            assert_eq!(record.name.name, "HoodieCommitMetadata");
+            assert_eq!(
+                record.name.namespace,
+                Some("org.apache.hudi.avro.model".to_string())
+            );
+
+            // Verify key fields exist
+            let field_names: Vec<_> = record.fields.iter().map(|f| 
f.name.as_str()).collect();
+            assert!(field_names.contains(&"version"));
+            assert!(field_names.contains(&"operationType"));
+            assert!(field_names.contains(&"partitionToWriteStats"));
+            assert!(field_names.contains(&"partitionToReplaceFileIds"));
+            assert!(field_names.contains(&"compacted"));
+            assert!(field_names.contains(&"extraMetadata"));
+        } else {
+            panic!("Expected Record schema");
+        }
+
+        // Print schema for verification (useful for debugging)
+        println!("Generated Avro Schema:\n{}", schema.canonical_form());
+    }
+
+    #[test]
+    fn test_write_stat_avro_schema() {
+        // Test that HoodieWriteStat also has proper Avro schema
+        let schema = HoodieWriteStat::get_schema();
+
+        if let Schema::Record(ref record) = schema {
+            assert_eq!(record.name.name, "HoodieWriteStat");
+            assert_eq!(
+                record.name.namespace,
+                Some("org.apache.hudi.avro.model".to_string())
+            );
+
+            let field_names: Vec<_> = record.fields.iter().map(|f| 
f.name.as_str()).collect();
+            assert!(field_names.contains(&"fileId"));
+            assert!(field_names.contains(&"path"));
+            assert!(field_names.contains(&"baseFile"));
+            assert!(field_names.contains(&"logFiles"));
+        } else {
+            panic!("Expected Record schema");
+        }
+    }
+
+    #[test]
+    fn test_from_json_bytes() {
+        let json_str = r#"{
+            "version": 1,
+            "operationType": "UPSERT",
+            "partitionToWriteStats": {
+                "p1": [{
+                    "fileId": "file1",
+                    "path": "p1/file1.parquet"
+                }]
+            },
+            "compacted": false
+        }"#;
+
+        let metadata = 
HoodieCommitMetadata::from_json_bytes(json_str.as_bytes()).unwrap();
+        assert_eq!(metadata.version, Some(1));
+        assert_eq!(metadata.operation_type, Some("UPSERT".to_string()));
+    }
+
+    #[test]
+    fn test_from_json_bytes_invalid() {
+        let invalid_json = b"invalid json";
+        let result = HoodieCommitMetadata::from_json_bytes(invalid_json);
+        assert!(result.is_err());
+        assert!(matches!(result, Err(CoreError::CommitMetadata(_))));
+    }
+
+    #[test]
+    fn test_get_partition_write_stats() {
+        let json = json!({
+            "partitionToWriteStats": {
+                "p1": [{
+                    "fileId": "file1",
+                    "path": "p1/file1.parquet"
+                }],
+                "p2": [{
+                    "fileId": "file2",
+                    "path": "p2/file2.parquet"
+                }]
+            }
+        });
+
+        let metadata: HoodieCommitMetadata = 
serde_json::from_value(json).unwrap();
+
+        // Test getting existing partition
+        let p1_stats = metadata.get_partition_write_stats("p1").unwrap();
+        assert_eq!(p1_stats.len(), 1);
+        assert_eq!(p1_stats[0].file_id, Some("file1".to_string()));
+
+        // Test getting non-existent partition
+        assert!(metadata.get_partition_write_stats("p3").is_none());
+    }
+
+    #[test]
+    fn test_get_partitions_with_writes() {
+        let json = json!({
+            "partitionToWriteStats": {
+                "p1": [{
+                    "fileId": "file1",
+                    "path": "p1/file1.parquet"
+                }],
+                "p2": [{
+                    "fileId": "file2",
+                    "path": "p2/file2.parquet"
+                }]
+            }
+        });
+
+        let metadata: HoodieCommitMetadata = 
serde_json::from_value(json).unwrap();
+        let mut partitions = metadata.get_partitions_with_writes();
+        partitions.sort();
+        assert_eq!(partitions, vec!["p1".to_string(), "p2".to_string()]);
+    }
+
+    #[test]
+    fn test_get_partitions_with_writes_empty() {
+        let json = json!({});
+        let metadata: HoodieCommitMetadata = 
serde_json::from_value(json).unwrap();
+        let partitions = metadata.get_partitions_with_writes();
+        assert_eq!(partitions.len(), 0);
+    }
+
+    #[test]
+    fn test_get_partition_replace_file_ids() {
+        let json = json!({
+            "partitionToReplaceFileIds": {
+                "30": ["d398fae1-c0e6-4098-8124-f55f7098bdba-0"],
+                "20": ["88163884-fef0-4aab-865d-c72327a8a1d5-0"]
+            }
+        });
+
+        let metadata: HoodieCommitMetadata = 
serde_json::from_value(json).unwrap();
+
+        // Test getting existing partition
+        let file_ids_30 = 
metadata.get_partition_replace_file_ids("30").unwrap();
+        assert_eq!(file_ids_30.len(), 1);
+        assert_eq!(file_ids_30[0], "d398fae1-c0e6-4098-8124-f55f7098bdba-0");
+
+        // Test getting non-existent partition
+        assert!(metadata.get_partition_replace_file_ids("40").is_none());
+    }
+
+    #[test]
+    fn test_get_partitions_with_replacements() {
+        let json = json!({
+            "partitionToReplaceFileIds": {
+                "30": ["file1"],
+                "20": ["file2"]
+            }
+        });
+
+        let metadata: HoodieCommitMetadata = 
serde_json::from_value(json).unwrap();
+        let mut partitions = metadata.get_partitions_with_replacements();
+        partitions.sort();
+        assert_eq!(partitions, vec!["20".to_string(), "30".to_string()]);
+    }
+
+    #[test]
+    fn test_get_partitions_with_replacements_empty() {
+        let json = json!({});
+        let metadata: HoodieCommitMetadata = 
serde_json::from_value(json).unwrap();
+        let partitions = metadata.get_partitions_with_replacements();
+        assert_eq!(partitions.len(), 0);
+    }
+
+    #[test]
+    fn test_iter_replace_file_ids() {
+        let json = json!({
+            "partitionToReplaceFileIds": {
+                "p1": ["file1", "file2"],
+                "p2": ["file3"]
+            }
+        });
+
+        let metadata: HoodieCommitMetadata = 
serde_json::from_value(json).unwrap();
+        let count = metadata.iter_replace_file_ids().count();
+        assert_eq!(count, 3);
+
+        let file_ids: Vec<_> = metadata
+            .iter_replace_file_ids()
+            .map(|(_, file_id)| file_id.as_str())
+            .collect();
+        assert!(file_ids.contains(&"file1"));
+        assert!(file_ids.contains(&"file2"));
+        assert!(file_ids.contains(&"file3"));
+    }
+
+    #[test]
+    fn test_iter_replace_file_ids_empty() {
+        let json = json!({});
+        let metadata: HoodieCommitMetadata = 
serde_json::from_value(json).unwrap();
+        let count = metadata.iter_replace_file_ids().count();
+        assert_eq!(count, 0);
+    }
+}
diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs
index e7ae3b1..1385fbb 100644
--- a/crates/core/src/metadata/mod.rs
+++ b/crates/core/src/metadata/mod.rs
@@ -16,7 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+pub mod commit;
 pub mod meta_field;
+pub mod replace_commit;
 
 pub const HUDI_METADATA_DIR: &str = ".hoodie";
 pub const DELTALAKE_METADATA_DIR: &str = "_delta_log";
diff --git a/crates/core/src/metadata/replace_commit.rs 
b/crates/core/src/metadata/replace_commit.rs
new file mode 100644
index 0000000..b1c7812
--- /dev/null
+++ b/crates/core/src/metadata/replace_commit.rs
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::error::CoreError;
+use crate::metadata::commit::HoodieWriteStat;
+use crate::Result;
+use apache_avro_derive::AvroSchema as DeriveAvroSchema;
+use serde::{Deserialize, Serialize};
+use serde_json::{Map, Value};
+use std::collections::HashMap;
+
+/// Represents the metadata for a Hudi Replace Commit
+///
+/// This is modeled from HoodieReplaceCommitMetadata.avsc.
+#[derive(Debug, Clone, Serialize, Deserialize, DeriveAvroSchema)]
+#[serde(rename_all = "camelCase")]
+#[avro(namespace = "org.apache.hudi.avro.model")]
+pub struct HoodieReplaceCommitMetadata {
+    // version: ["int","null"] with default 1 in Avro; we model as Option<i32>
+    pub version: Option<i32>,
+    #[avro(rename = "operationType")]
+    pub operation_type: Option<String>,
+    #[avro(rename = "partitionToWriteStats")]
+    pub partition_to_write_stats: Option<HashMap<String, 
Vec<HoodieWriteStat>>>,
+    pub compacted: Option<bool>,
+    #[avro(rename = "extraMetadata")]
+    pub extra_metadata: Option<HashMap<String, String>>,
+    #[avro(rename = "partitionToReplaceFileIds")]
+    pub partition_to_replace_file_ids: Option<HashMap<String, Vec<String>>>,
+}
+
+impl HoodieReplaceCommitMetadata {
+    /// Parse replace commit metadata from a serde_json Map
+    pub fn from_json_map(map: &Map<String, Value>) -> Result<Self> {
+        serde_json::from_value(Value::Object(map.clone())).map_err(|e| {
+            CoreError::CommitMetadata(format!("Failed to parse commit 
metadata: {}", e))
+        })
+    }
+
+    /// Parse replace commit metadata from JSON bytes
+    pub fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
+        serde_json::from_slice(bytes).map_err(|e| {
+            CoreError::CommitMetadata(format!("Failed to parse commit 
metadata: {}", e))
+        })
+    }
+
+    /// Iterate over all replace file IDs across all partitions
+    pub fn iter_replace_file_ids(&self) -> impl Iterator<Item = (&String, 
&String)> {
+        self.partition_to_replace_file_ids
+            .as_ref()
+            .into_iter()
+            .flat_map(|replace_ids| {
+                replace_ids.iter().flat_map(|(partition, file_ids)| {
+                    file_ids.iter().map(move |file_id| (partition, file_id))
+                })
+            })
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use serde_json::json;
+
+    #[test]
+    fn test_parse_replace_commit() {
+        let json = json!({
+            "partitionToReplaceFileIds": {
+                "30": ["a-0"],
+                "20": ["b-0", "b-1"],
+                "": ["c-0"]
+            },
+            "extraMetadata": {"k":"v"},
+            "version": 1,
+            "operationType": "REPLACE_COMMIT"
+        });
+
+        let metadata: HoodieReplaceCommitMetadata = 
serde_json::from_value(json).unwrap();
+        let ids: Vec<(&String, &String)> = 
metadata.iter_replace_file_ids().collect();
+        assert_eq!(ids.len(), 4);
+    }
+
+    #[test]
+    fn test_from_json_bytes() {
+        let json_str = r#"{
+            "partitionToReplaceFileIds": {
+                "30": ["a-0"],
+                "20": ["b-0"]
+            },
+            "version": 1,
+            "operationType": "REPLACE_COMMIT"
+        }"#;
+
+        let metadata = 
HoodieReplaceCommitMetadata::from_json_bytes(json_str.as_bytes()).unwrap();
+        assert_eq!(metadata.version, Some(1));
+        assert_eq!(metadata.operation_type, 
Some("REPLACE_COMMIT".to_string()));
+    }
+
+    #[test]
+    fn test_from_json_bytes_invalid() {
+        let invalid_json = b"invalid json";
+        let result = 
HoodieReplaceCommitMetadata::from_json_bytes(invalid_json);
+        assert!(result.is_err());
+        assert!(matches!(result, Err(CoreError::CommitMetadata(_))));
+    }
+
+    #[test]
+    fn test_iter_replace_file_ids_empty() {
+        let json = json!({});
+        let metadata: HoodieReplaceCommitMetadata = 
serde_json::from_value(json).unwrap();
+        let count = metadata.iter_replace_file_ids().count();
+        assert_eq!(count, 0);
+    }
+}
diff --git a/crates/core/src/schema/resolver.rs 
b/crates/core/src/schema/resolver.rs
index 803f8c6..e18ba1f 100644
--- a/crates/core/src/schema/resolver.rs
+++ b/crates/core/src/schema/resolver.rs
@@ -19,6 +19,7 @@
 use crate::avro_to_arrow::to_arrow_schema;
 use crate::config::table::HudiTableConfig;
 use crate::error::{CoreError, Result};
+use crate::metadata::commit::HoodieCommitMetadata;
 use crate::schema::prepend_meta_fields;
 use crate::storage::Storage;
 use crate::table::Table;
@@ -121,55 +122,40 @@ async fn resolve_schema_from_base_file(
     commit_metadata: &Map<String, Value>,
     storage: Arc<Storage>,
 ) -> Result<Schema> {
-    let first_partition = commit_metadata
-        .get("partitionToWriteStats")
-        .and_then(|v| v.as_object());
+    let metadata = HoodieCommitMetadata::from_json_map(commit_metadata)?;
 
-    let partition_path = first_partition
-        .and_then(|obj| obj.keys().next())
-        .ok_or_else(|| {
+    // Get the first write stat from any partition
+    let (partition, first_stat) = 
metadata.iter_write_stats().next().ok_or_else(|| {
+        CoreError::CommitMetadata(
+            "Failed to resolve the latest schema: no write status in commit 
metadata".to_string(),
+        )
+    })?;
+
+    // Try to get the base file path from either 'path' or 'baseFile' field
+    if let Some(path) = &first_stat.path {
+        if path.ends_with(".parquet") {
+            return Ok(storage.get_parquet_file_schema(path).await?);
+        }
+    }
+
+    // Handle deltacommit case with baseFile
+    if let Some(base_file) = &first_stat.base_file {
+        let parquet_file_path_buf = PathBuf::from_str(partition)
+            .map_err(|e| {
+                CoreError::CommitMetadata(format!("Failed to resolve the 
latest schema: {}", e))
+            })?
+            .join(base_file);
+        let path = parquet_file_path_buf.to_str().ok_or_else(|| {
             CoreError::CommitMetadata(
-                "Failed to resolve the latest schema: no write status in 
commit metadata"
-                    .to_string(),
+                "Failed to resolve the latest schema: invalid file 
path".to_string(),
             )
         })?;
-
-    let first_value = first_partition
-        .and_then(|obj| obj.values().next())
-        .and_then(|value| value.as_array())
-        .and_then(|arr| arr.first());
-
-    let base_file_path = first_value.and_then(|v| v["path"].as_str());
-    match base_file_path {
-        Some(path) if path.ends_with(".parquet") => {
-            Ok(storage.get_parquet_file_schema(path).await?)
-        }
-        Some(_) => {
-            // deltacommit case
-            // TODO: properly parse deltacommit structure
-            let base_file = first_value
-                .and_then(|v| v["baseFile"].as_str())
-                .ok_or_else(|| {
-                    CoreError::CommitMetadata(
-                        "Failed to resolve the latest schema: no file path 
found".to_string(),
-                    )
-                })?;
-            let parquet_file_path_buf = PathBuf::from_str(partition_path)
-                .map_err(|e| {
-                    CoreError::CommitMetadata(format!("Failed to resolve the 
latest schema: {}", e))
-                })?
-                .join(base_file);
-            let path = parquet_file_path_buf.to_str().ok_or_else(|| {
-                CoreError::CommitMetadata(
-                    "Failed to resolve the latest schema: invalid file 
path".to_string(),
-                )
-            })?;
-            Ok(storage.get_parquet_file_schema(path).await?)
-        }
-        None => Err(CoreError::CommitMetadata(
-            "Failed to resolve the latest schema: no file path 
found".to_string(),
-        )),
+        return Ok(storage.get_parquet_file_schema(path).await?);
     }
+
+    Err(CoreError::CommitMetadata(
+        "Failed to resolve the latest schema: no file path found".to_string(),
+    ))
 }
 
 fn sanitize_avro_schema_str(avro_schema_str: &str) -> String {
@@ -196,3 +182,89 @@ fn extract_avro_schema_from_commit_metadata(
                 .map(|s| s.to_string())
         })
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use serde_json::json;
+
+    #[test]
+    fn test_resolve_avro_schema_from_commit_metadata_with_schema() {
+        let metadata = json!({
+            "extraMetadata": {
+                "schema": 
r#"{"type":"record","name":"TestRecord","fields":[{"name":"id","type":"int"}]}"#
+            }
+        })
+        .as_object()
+        .unwrap()
+        .clone();
+
+        let result = resolve_avro_schema_from_commit_metadata(&metadata);
+        assert!(result.is_ok());
+        let schema = result.unwrap();
+        assert!(schema.contains("TestRecord"));
+    }
+
+    #[test]
+    fn test_resolve_avro_schema_from_commit_metadata_empty() {
+        let metadata = Map::new();
+        let result = resolve_avro_schema_from_commit_metadata(&metadata);
+        assert!(result.is_err());
+        assert!(matches!(result, Err(CoreError::CommitMetadata(_))));
+    }
+
+    #[test]
+    fn test_resolve_avro_schema_from_commit_metadata_no_schema() {
+        let metadata = json!({
+            "extraMetadata": {
+                "other": "value"
+            }
+        })
+        .as_object()
+        .unwrap()
+        .clone();
+
+        let result = resolve_avro_schema_from_commit_metadata(&metadata);
+        assert!(result.is_err());
+        assert!(matches!(result, Err(CoreError::SchemaNotFound(_))));
+    }
+
+    #[test]
+    fn test_sanitize_avro_schema_str() {
+        let schema_with_escape = r#"test\:schema"#;
+        let sanitized = sanitize_avro_schema_str(schema_with_escape);
+        assert_eq!(sanitized, "test:schema");
+
+        let schema_with_whitespace = "  test schema  ";
+        let sanitized = sanitize_avro_schema_str(schema_with_whitespace);
+        assert_eq!(sanitized, "test schema");
+    }
+
+    #[test]
+    fn test_extract_avro_schema_from_commit_metadata() {
+        let metadata = json!({
+            "extraMetadata": {
+                "schema": "test_schema"
+            }
+        })
+        .as_object()
+        .unwrap()
+        .clone();
+
+        let schema = extract_avro_schema_from_commit_metadata(&metadata);
+        assert_eq!(schema, Some("test_schema".to_string()));
+    }
+
+    #[test]
+    fn test_extract_avro_schema_from_commit_metadata_none() {
+        let metadata = json!({
+            "other": "value"
+        })
+        .as_object()
+        .unwrap()
+        .clone();
+
+        let schema = extract_avro_schema_from_commit_metadata(&metadata);
+        assert_eq!(schema, None);
+    }
+}


Reply via email to