This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new f27feb8 refactor: define avro model for parsing commit metadata (#477)
f27feb8 is described below
commit f27feb8ceed468063e5eec716a768b2f086977df
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Oct 28 21:19:56 2025 -0700
refactor: define avro model for parsing commit metadata (#477)
---
Cargo.toml | 3 +-
crates/core/Cargo.toml | 1 +
crates/core/src/file_group/builder.rs | 178 ++++++------
crates/core/src/metadata/commit.rs | 435 +++++++++++++++++++++++++++++
crates/core/src/metadata/mod.rs | 2 +
crates/core/src/metadata/replace_commit.rs | 130 +++++++++
crates/core/src/schema/resolver.rs | 160 ++++++++---
7 files changed, 779 insertions(+), 130 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 595aeda..bc991d1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -51,7 +51,8 @@ object_store = { version = "~0.11.2", features = ["aws",
"azure", "gcp"] }
parquet = { version = "~54.2.0", features = ["async", "object_store"] }
# avro
-apache-avro = { version = "~0.17.0" }
+apache-avro = { version = "~0.17.0", features = ["derive"] }
+apache-avro-derive = { version = "~0.17.0" }
# datafusion
datafusion = { version = "~46.0.0" }
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index 7cd88af..e8bde12 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -45,6 +45,7 @@ parquet = { workspace = true }
# avro
apache-avro = { workspace = true }
+apache-avro-derive = { workspace = true }
# serde
serde = { workspace = true }
diff --git a/crates/core/src/file_group/builder.rs
b/crates/core/src/file_group/builder.rs
index 9f2d0c3..bbc767c 100644
--- a/crates/core/src/file_group/builder.rs
+++ b/crates/core/src/file_group/builder.rs
@@ -18,6 +18,8 @@
*/
use crate::error::CoreError;
use crate::file_group::FileGroup;
+use crate::metadata::commit::HoodieCommitMetadata;
+use crate::metadata::replace_commit::HoodieReplaceCommitMetadata;
use crate::Result;
use serde_json::{Map, Value};
use std::collections::HashSet;
@@ -47,64 +49,48 @@ impl FileGroupMerger for HashSet<FileGroup> {
}
pub fn build_file_groups(commit_metadata: &Map<String, Value>) ->
Result<HashSet<FileGroup>> {
- let partition_stats = commit_metadata
- .get("partitionToWriteStats")
- .and_then(|v| v.as_object())
- .ok_or_else(|| {
- CoreError::CommitMetadata("Invalid or missing
partitionToWriteStats object".into())
- })?;
+ let metadata = HoodieCommitMetadata::from_json_map(commit_metadata)?;
let mut file_groups = HashSet::new();
- for (partition, write_stats_array) in partition_stats {
- let write_stats = write_stats_array
- .as_array()
- .ok_or_else(|| CoreError::CommitMetadata("Invalid write stats
array".into()))?;
-
- for stat in write_stats {
- let file_id = stat
- .get("fileId")
- .and_then(|v| v.as_str())
- .ok_or_else(|| CoreError::CommitMetadata("Invalid fileId in
write stats".into()))?;
-
- let mut file_group = FileGroup::new(file_id.to_string(),
partition.clone());
-
- if let Some(base_file_name) = stat.get("baseFile") {
- let base_file_name = base_file_name
- .as_str()
- .ok_or_else(|| CoreError::CommitMetadata("Invalid base
file name".into()))?;
- file_group.add_base_file_from_name(base_file_name)?;
-
- if let Some(log_file_names) = stat.get("logFiles") {
- let log_file_names =
log_file_names.as_array().ok_or_else(|| {
- CoreError::CommitMetadata("Invalid log files
array".into())
- })?;
- for log_file_name in log_file_names {
- let log_file_name =
log_file_name.as_str().ok_or_else(|| {
- CoreError::CommitMetadata("Invalid log file
name".into())
- })?;
- file_group.add_log_file_from_name(log_file_name)?;
- }
- } else {
- return Err(CoreError::CommitMetadata(
- "Missing log files in write stats".into(),
- ));
- }
- } else {
- let path = stat.get("path").and_then(|v|
v.as_str()).ok_or_else(|| {
- CoreError::CommitMetadata("Invalid path in write
stats".into())
- })?;
+ for (partition, write_stat) in metadata.iter_write_stats() {
+ let file_id = write_stat
+ .file_id
+ .as_ref()
+ .ok_or_else(|| CoreError::CommitMetadata("Missing fileId in write
stats".into()))?;
- let file_name = Path::new(path)
- .file_name()
- .and_then(|name| name.to_str())
- .ok_or_else(|| CoreError::CommitMetadata("Invalid file
name in path".into()))?;
+ let mut file_group = FileGroup::new(file_id.clone(),
partition.clone());
- file_group.add_base_file_from_name(file_name)?;
- }
+ // Handle two cases:
+ // 1. MOR table with baseFile and logFiles
+ // 2. COW table with path only
+ if let Some(base_file_name) = &write_stat.base_file {
+ file_group.add_base_file_from_name(base_file_name)?;
- file_groups.insert(file_group);
+ if let Some(log_file_names) = &write_stat.log_files {
+ for log_file_name in log_file_names {
+ file_group.add_log_file_from_name(log_file_name)?;
+ }
+ } else {
+ return Err(CoreError::CommitMetadata(
+ "Missing log files in write stats".into(),
+ ));
+ }
+ } else {
+ let path = write_stat
+ .path
+ .as_ref()
+ .ok_or_else(|| CoreError::CommitMetadata("Missing path in
write stats".into()))?;
+
+ let file_name = Path::new(path)
+ .file_name()
+ .and_then(|name| name.to_str())
+ .ok_or_else(|| CoreError::CommitMetadata("Invalid file name in
path".into()))?;
+
+ file_group.add_base_file_from_name(file_name)?;
}
+
+ file_groups.insert(file_group);
}
Ok(file_groups)
@@ -113,28 +99,14 @@ pub fn build_file_groups(commit_metadata: &Map<String,
Value>) -> Result<HashSet
pub fn build_replaced_file_groups(
commit_metadata: &Map<String, Value>,
) -> Result<HashSet<FileGroup>> {
- let partition_to_replaced = commit_metadata
- .get("partitionToReplaceFileIds")
- .and_then(|v| v.as_object())
- .ok_or_else(|| {
- CoreError::CommitMetadata("Invalid or missing
partitionToReplaceFileIds object".into())
- })?;
+ // Replace commits follow HoodieReplaceCommitMetadata schema; parse with
the dedicated type
+ let metadata =
HoodieReplaceCommitMetadata::from_json_map(commit_metadata)?;
let mut file_groups = HashSet::new();
- for (partition, file_ids_value) in partition_to_replaced {
- let file_ids = file_ids_value
- .as_array()
- .ok_or_else(|| CoreError::CommitMetadata("Invalid file group ids
array".into()))?;
-
- for file_id in file_ids {
- let id = file_id
- .as_str()
- .ok_or_else(|| CoreError::CommitMetadata("Invalid file group
id string".into()))?;
-
- let file_group = FileGroup::new(id.to_string(), partition.clone());
- file_groups.insert(file_group);
- }
+ for (partition, file_id) in metadata.iter_replace_file_ids() {
+ let file_group = FileGroup::new(file_id.clone(), partition.clone());
+ file_groups.insert(file_group);
}
Ok(file_groups)
@@ -143,6 +115,37 @@ pub fn build_replaced_file_groups(
#[cfg(test)]
mod tests {
+ mod test_file_group_merger {
+ use super::super::*;
+ use crate::file_group::FileGroup;
+
+ #[test]
+ fn test_merge_file_groups() {
+ let mut existing = HashSet::new();
+ let fg1 = FileGroup::new("file1".to_string(), "p1".to_string());
+ existing.insert(fg1);
+
+ let new_groups = vec![
+ FileGroup::new("file2".to_string(), "p1".to_string()),
+ FileGroup::new("file3".to_string(), "p2".to_string()),
+ ];
+
+ existing.merge(new_groups).unwrap();
+ assert_eq!(existing.len(), 3);
+ }
+
+ #[test]
+ fn test_merge_empty() {
+ let mut existing = HashSet::new();
+ let fg1 = FileGroup::new("file1".to_string(), "p1".to_string());
+ existing.insert(fg1);
+
+ let new_groups: Vec<FileGroup> = vec![];
+ existing.merge(new_groups).unwrap();
+ assert_eq!(existing.len(), 1);
+ }
+ }
+
mod test_build_file_groups {
use super::super::*;
use serde_json::{json, Map, Value};
@@ -158,9 +161,10 @@ mod tests {
.clone();
let result = build_file_groups(&metadata);
- assert!(matches!(result,
- Err(CoreError::CommitMetadata(msg))
- if msg == "Invalid or missing partitionToWriteStats object"));
+ // With the new implementation, this returns Ok with an empty
HashSet
+ // because iter_write_stats() returns an empty iterator when
partition_to_write_stats is None
+ assert!(result.is_ok());
+ assert_eq!(result.unwrap().len(), 0);
}
#[test]
@@ -177,7 +181,7 @@ mod tests {
let result = build_file_groups(&metadata);
assert!(matches!(
result,
- Err(CoreError::CommitMetadata(msg)) if msg == "Invalid write
stats array"
+ Err(CoreError::CommitMetadata(msg)) if msg.contains("Failed to
parse commit metadata")
));
}
@@ -197,7 +201,7 @@ mod tests {
let result = build_file_groups(&metadata);
assert!(matches!(
result,
- Err(CoreError::CommitMetadata(msg)) if msg == "Invalid fileId
in write stats"
+ Err(CoreError::CommitMetadata(msg)) if msg == "Missing fileId
in write stats"
));
}
@@ -217,7 +221,7 @@ mod tests {
let result = build_file_groups(&metadata);
assert!(matches!(
result,
- Err(CoreError::CommitMetadata(msg)) if msg == "Invalid path in
write stats"
+ Err(CoreError::CommitMetadata(msg)) if msg == "Missing path in
write stats"
));
}
@@ -257,9 +261,10 @@ mod tests {
.clone();
let result = build_file_groups(&metadata);
+ // Serde will fail to parse this and return a deserialization error
assert!(matches!(
result,
- Err(CoreError::CommitMetadata(msg)) if msg == "Invalid fileId
in write stats"
+ Err(CoreError::CommitMetadata(msg)) if msg.contains("Failed to
parse commit metadata")
));
}
@@ -278,9 +283,10 @@ mod tests {
.clone();
let result = build_file_groups(&metadata);
+ // Serde will fail to parse this and return a deserialization error
assert!(matches!(
result,
- Err(CoreError::CommitMetadata(msg)) if msg == "Invalid path in
write stats"
+ Err(CoreError::CommitMetadata(msg)) if msg.contains("Failed to
parse commit metadata")
));
}
@@ -331,10 +337,10 @@ mod tests {
.clone();
let result = build_replaced_file_groups(&metadata);
- assert!(matches!(
- result,
- Err(CoreError::CommitMetadata(msg)) if msg == "Invalid or
missing partitionToReplaceFileIds object"
- ));
+ // With the new implementation, this returns Ok with an empty
HashSet
+ // because iter_replace_file_ids() returns an empty iterator when
partition_to_replace_file_ids is None
+ assert!(result.is_ok());
+ assert_eq!(result.unwrap().len(), 0);
}
#[test]
@@ -351,7 +357,7 @@ mod tests {
let result = build_replaced_file_groups(&metadata);
assert!(matches!(
result,
- Err(CoreError::CommitMetadata(msg)) if msg == "Invalid file
group ids array"
+ Err(CoreError::CommitMetadata(msg)) if msg.contains("Failed to
parse commit metadata")
));
}
@@ -367,9 +373,10 @@ mod tests {
.clone();
let result = build_replaced_file_groups(&metadata);
+ // Serde will fail to parse this
assert!(matches!(
result,
- Err(CoreError::CommitMetadata(msg)) if msg == "Invalid file
group id string"
+ Err(CoreError::CommitMetadata(msg)) if msg.contains("Failed to
parse commit metadata")
));
}
@@ -385,9 +392,10 @@ mod tests {
.clone();
let result = build_replaced_file_groups(&metadata);
+ // Serde will fail to parse this
assert!(matches!(
result,
- Err(CoreError::CommitMetadata(msg)) if msg == "Invalid file
group id string"
+ Err(CoreError::CommitMetadata(msg)) if msg.contains("Failed to
parse commit metadata")
));
}
diff --git a/crates/core/src/metadata/commit.rs
b/crates/core/src/metadata/commit.rs
new file mode 100644
index 0000000..579874b
--- /dev/null
+++ b/crates/core/src/metadata/commit.rs
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::error::CoreError;
+use crate::Result;
+use apache_avro_derive::AvroSchema as DeriveAvroSchema;
+use serde::{Deserialize, Serialize};
+use serde_json::{Map, Value};
+use std::collections::HashMap;
+
+/// Represents statistics for a single file write operation in a commit
+///
+/// This struct is automatically derived to/from Avro schema using
apache-avro-derive.
+/// The Avro schema can be accessed via `HoodieWriteStat::get_schema()`.
+#[derive(Debug, Clone, Serialize, Deserialize, DeriveAvroSchema)]
+#[serde(rename_all = "camelCase")]
+#[avro(namespace = "org.apache.hudi.avro.model")]
+pub struct HoodieWriteStat {
+ #[avro(rename = "fileId")]
+ pub file_id: Option<String>,
+ pub path: Option<String>,
+ #[avro(rename = "baseFile")]
+ pub base_file: Option<String>,
+ #[avro(rename = "logFiles")]
+ pub log_files: Option<Vec<String>>,
+ #[avro(rename = "prevCommit")]
+ pub prev_commit: Option<String>,
+ #[avro(rename = "numWrites")]
+ pub num_writes: Option<i64>,
+ #[avro(rename = "numDeletes")]
+ pub num_deletes: Option<i64>,
+ #[avro(rename = "numUpdateWrites")]
+ pub num_update_writes: Option<i64>,
+ #[avro(rename = "numInserts")]
+ pub num_inserts: Option<i64>,
+ #[avro(rename = "totalWriteBytes")]
+ pub total_write_bytes: Option<i64>,
+ #[avro(rename = "totalWriteErrors")]
+ pub total_write_errors: Option<i64>,
+}
+
+/// Represents the metadata for a Hudi commit
+///
+/// This struct is automatically derived to/from Avro schema using
apache-avro-derive.
+/// The Avro schema can be accessed via `HoodieCommitMetadata::get_schema()`.
+///
+/// # Example
+/// ```
+/// use hudi_core::metadata::commit::HoodieCommitMetadata;
+///
+/// // Get the Avro schema
+/// let schema = HoodieCommitMetadata::get_schema();
+/// println!("Schema: {}", schema.canonical_form());
+/// ```
+#[derive(Debug, Clone, Serialize, Deserialize, DeriveAvroSchema)]
+#[serde(rename_all = "camelCase")]
+#[avro(namespace = "org.apache.hudi.avro.model")]
+pub struct HoodieCommitMetadata {
+ pub version: Option<i32>,
+ #[avro(rename = "operationType")]
+ pub operation_type: Option<String>,
+ #[avro(rename = "partitionToWriteStats")]
+ pub partition_to_write_stats: Option<HashMap<String,
Vec<HoodieWriteStat>>>,
+ #[avro(rename = "partitionToReplaceFileIds")]
+ pub partition_to_replace_file_ids: Option<HashMap<String, Vec<String>>>,
+ pub compacted: Option<bool>,
+ #[avro(rename = "extraMetadata")]
+ pub extra_metadata: Option<HashMap<String, String>>,
+}
+
+impl HoodieCommitMetadata {
+ /// Parse commit metadata from a serde_json Map
+ pub fn from_json_map(map: &Map<String, Value>) -> Result<Self> {
+ serde_json::from_value(Value::Object(map.clone())).map_err(|e| {
+ CoreError::CommitMetadata(format!("Failed to parse commit
metadata: {}", e))
+ })
+ }
+
+ /// Parse commit metadata from JSON bytes
+ pub fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
+ serde_json::from_slice(bytes).map_err(|e| {
+ CoreError::CommitMetadata(format!("Failed to parse commit
metadata: {}", e))
+ })
+ }
+
+ /// Get the write stats for a specific partition
+ pub fn get_partition_write_stats(&self, partition: &str) ->
Option<&Vec<HoodieWriteStat>> {
+ self.partition_to_write_stats
+ .as_ref()
+ .and_then(|stats| stats.get(partition))
+ }
+
+ /// Get all partitions with write stats
+ pub fn get_partitions_with_writes(&self) -> Vec<String> {
+ self.partition_to_write_stats
+ .as_ref()
+ .map(|stats| stats.keys().cloned().collect())
+ .unwrap_or_default()
+ }
+
+ /// Get the file IDs to be replaced for a specific partition
+ pub fn get_partition_replace_file_ids(&self, partition: &str) ->
Option<&Vec<String>> {
+ self.partition_to_replace_file_ids
+ .as_ref()
+ .and_then(|ids| ids.get(partition))
+ }
+
+ /// Get all partitions with file replacements
+ pub fn get_partitions_with_replacements(&self) -> Vec<String> {
+ self.partition_to_replace_file_ids
+ .as_ref()
+ .map(|ids| ids.keys().cloned().collect())
+ .unwrap_or_default()
+ }
+
+ /// Iterate over all write stats across all partitions
+ pub fn iter_write_stats(&self) -> impl Iterator<Item = (&String,
&HoodieWriteStat)> {
+ self.partition_to_write_stats
+ .as_ref()
+ .into_iter()
+ .flat_map(|stats| {
+ stats.iter().flat_map(|(partition, write_stats)| {
+ write_stats.iter().map(move |stat| (partition, stat))
+ })
+ })
+ }
+
+ /// Iterate over all replace file IDs across all partitions
+ pub fn iter_replace_file_ids(&self) -> impl Iterator<Item = (&String,
&String)> {
+ self.partition_to_replace_file_ids
+ .as_ref()
+ .into_iter()
+ .flat_map(|replace_ids| {
+ replace_ids.iter().flat_map(|(partition, file_ids)| {
+ file_ids.iter().map(move |file_id| (partition, file_id))
+ })
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use apache_avro::{AvroSchema, Schema};
+ use serde_json::json;
+
+ #[test]
+ fn test_parse_commit_metadata() {
+ let json = json!({
+ "version": 1,
+ "operationType": "UPSERT",
+ "partitionToWriteStats": {
+ "byteField=20/shortField=100": [{
+ "fileId": "bb7c3a45-387f-490d-aab2-981c3f1a8ada-0",
+ "path":
"byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
+ "numWrites": 100,
+ "totalWriteBytes": 1024
+ }]
+ },
+ "compacted": false
+ });
+
+ let metadata: HoodieCommitMetadata =
serde_json::from_value(json).unwrap();
+ assert_eq!(metadata.version, Some(1));
+ assert_eq!(metadata.operation_type, Some("UPSERT".to_string()));
+ assert!(metadata.partition_to_write_stats.is_some());
+
+ let stats = metadata
+ .get_partition_write_stats("byteField=20/shortField=100")
+ .unwrap();
+ assert_eq!(stats.len(), 1);
+ assert_eq!(
+ stats[0].file_id,
+ Some("bb7c3a45-387f-490d-aab2-981c3f1a8ada-0".to_string())
+ );
+ }
+
+ #[test]
+ fn test_parse_replace_file_ids() {
+ let json = json!({
+ "partitionToReplaceFileIds": {
+ "30": ["d398fae1-c0e6-4098-8124-f55f7098bdba-0"],
+ "20": ["88163884-fef0-4aab-865d-c72327a8a1d5-0"]
+ }
+ });
+
+ let metadata: HoodieCommitMetadata =
serde_json::from_value(json).unwrap();
+
+ let file_ids = metadata.get_partition_replace_file_ids("30").unwrap();
+ assert_eq!(file_ids.len(), 1);
+ assert_eq!(file_ids[0], "d398fae1-c0e6-4098-8124-f55f7098bdba-0");
+ }
+
+ #[test]
+ fn test_iter_write_stats() {
+ let json = json!({
+ "partitionToWriteStats": {
+ "p1": [{
+ "fileId": "file1",
+ "path": "p1/file1.parquet"
+ }],
+ "p2": [{
+ "fileId": "file2",
+ "path": "p2/file2.parquet"
+ }]
+ }
+ });
+
+ let metadata: HoodieCommitMetadata =
serde_json::from_value(json).unwrap();
+ let count = metadata.iter_write_stats().count();
+ assert_eq!(count, 2);
+ }
+
+ #[test]
+ fn test_avro_schema_generation() {
+ // Test that the derived Avro schema can be generated
+ let schema = HoodieCommitMetadata::get_schema();
+
+ // Verify it's a record type
+ if let Schema::Record(ref record) = schema {
+ assert_eq!(record.name.name, "HoodieCommitMetadata");
+ assert_eq!(
+ record.name.namespace,
+ Some("org.apache.hudi.avro.model".to_string())
+ );
+
+ // Verify key fields exist
+ let field_names: Vec<_> = record.fields.iter().map(|f|
f.name.as_str()).collect();
+ assert!(field_names.contains(&"version"));
+ assert!(field_names.contains(&"operationType"));
+ assert!(field_names.contains(&"partitionToWriteStats"));
+ assert!(field_names.contains(&"partitionToReplaceFileIds"));
+ assert!(field_names.contains(&"compacted"));
+ assert!(field_names.contains(&"extraMetadata"));
+ } else {
+ panic!("Expected Record schema");
+ }
+
+ // Print schema for verification (useful for debugging)
+ println!("Generated Avro Schema:\n{}", schema.canonical_form());
+ }
+
+ #[test]
+ fn test_write_stat_avro_schema() {
+ // Test that HoodieWriteStat also has proper Avro schema
+ let schema = HoodieWriteStat::get_schema();
+
+ if let Schema::Record(ref record) = schema {
+ assert_eq!(record.name.name, "HoodieWriteStat");
+ assert_eq!(
+ record.name.namespace,
+ Some("org.apache.hudi.avro.model".to_string())
+ );
+
+ let field_names: Vec<_> = record.fields.iter().map(|f|
f.name.as_str()).collect();
+ assert!(field_names.contains(&"fileId"));
+ assert!(field_names.contains(&"path"));
+ assert!(field_names.contains(&"baseFile"));
+ assert!(field_names.contains(&"logFiles"));
+ } else {
+ panic!("Expected Record schema");
+ }
+ }
+
+ #[test]
+ fn test_from_json_bytes() {
+ let json_str = r#"{
+ "version": 1,
+ "operationType": "UPSERT",
+ "partitionToWriteStats": {
+ "p1": [{
+ "fileId": "file1",
+ "path": "p1/file1.parquet"
+ }]
+ },
+ "compacted": false
+ }"#;
+
+ let metadata =
HoodieCommitMetadata::from_json_bytes(json_str.as_bytes()).unwrap();
+ assert_eq!(metadata.version, Some(1));
+ assert_eq!(metadata.operation_type, Some("UPSERT".to_string()));
+ }
+
+ #[test]
+ fn test_from_json_bytes_invalid() {
+ let invalid_json = b"invalid json";
+ let result = HoodieCommitMetadata::from_json_bytes(invalid_json);
+ assert!(result.is_err());
+ assert!(matches!(result, Err(CoreError::CommitMetadata(_))));
+ }
+
+ #[test]
+ fn test_get_partition_write_stats() {
+ let json = json!({
+ "partitionToWriteStats": {
+ "p1": [{
+ "fileId": "file1",
+ "path": "p1/file1.parquet"
+ }],
+ "p2": [{
+ "fileId": "file2",
+ "path": "p2/file2.parquet"
+ }]
+ }
+ });
+
+ let metadata: HoodieCommitMetadata =
serde_json::from_value(json).unwrap();
+
+ // Test getting existing partition
+ let p1_stats = metadata.get_partition_write_stats("p1").unwrap();
+ assert_eq!(p1_stats.len(), 1);
+ assert_eq!(p1_stats[0].file_id, Some("file1".to_string()));
+
+ // Test getting non-existent partition
+ assert!(metadata.get_partition_write_stats("p3").is_none());
+ }
+
+ #[test]
+ fn test_get_partitions_with_writes() {
+ let json = json!({
+ "partitionToWriteStats": {
+ "p1": [{
+ "fileId": "file1",
+ "path": "p1/file1.parquet"
+ }],
+ "p2": [{
+ "fileId": "file2",
+ "path": "p2/file2.parquet"
+ }]
+ }
+ });
+
+ let metadata: HoodieCommitMetadata =
serde_json::from_value(json).unwrap();
+ let mut partitions = metadata.get_partitions_with_writes();
+ partitions.sort();
+ assert_eq!(partitions, vec!["p1".to_string(), "p2".to_string()]);
+ }
+
+ #[test]
+ fn test_get_partitions_with_writes_empty() {
+ let json = json!({});
+ let metadata: HoodieCommitMetadata =
serde_json::from_value(json).unwrap();
+ let partitions = metadata.get_partitions_with_writes();
+ assert_eq!(partitions.len(), 0);
+ }
+
+ #[test]
+ fn test_get_partition_replace_file_ids() {
+ let json = json!({
+ "partitionToReplaceFileIds": {
+ "30": ["d398fae1-c0e6-4098-8124-f55f7098bdba-0"],
+ "20": ["88163884-fef0-4aab-865d-c72327a8a1d5-0"]
+ }
+ });
+
+ let metadata: HoodieCommitMetadata =
serde_json::from_value(json).unwrap();
+
+ // Test getting existing partition
+ let file_ids_30 =
metadata.get_partition_replace_file_ids("30").unwrap();
+ assert_eq!(file_ids_30.len(), 1);
+ assert_eq!(file_ids_30[0], "d398fae1-c0e6-4098-8124-f55f7098bdba-0");
+
+ // Test getting non-existent partition
+ assert!(metadata.get_partition_replace_file_ids("40").is_none());
+ }
+
+ #[test]
+ fn test_get_partitions_with_replacements() {
+ let json = json!({
+ "partitionToReplaceFileIds": {
+ "30": ["file1"],
+ "20": ["file2"]
+ }
+ });
+
+ let metadata: HoodieCommitMetadata =
serde_json::from_value(json).unwrap();
+ let mut partitions = metadata.get_partitions_with_replacements();
+ partitions.sort();
+ assert_eq!(partitions, vec!["20".to_string(), "30".to_string()]);
+ }
+
+ #[test]
+ fn test_get_partitions_with_replacements_empty() {
+ let json = json!({});
+ let metadata: HoodieCommitMetadata =
serde_json::from_value(json).unwrap();
+ let partitions = metadata.get_partitions_with_replacements();
+ assert_eq!(partitions.len(), 0);
+ }
+
+ #[test]
+ fn test_iter_replace_file_ids() {
+ let json = json!({
+ "partitionToReplaceFileIds": {
+ "p1": ["file1", "file2"],
+ "p2": ["file3"]
+ }
+ });
+
+ let metadata: HoodieCommitMetadata =
serde_json::from_value(json).unwrap();
+ let count = metadata.iter_replace_file_ids().count();
+ assert_eq!(count, 3);
+
+ let file_ids: Vec<_> = metadata
+ .iter_replace_file_ids()
+ .map(|(_, file_id)| file_id.as_str())
+ .collect();
+ assert!(file_ids.contains(&"file1"));
+ assert!(file_ids.contains(&"file2"));
+ assert!(file_ids.contains(&"file3"));
+ }
+
+ #[test]
+ fn test_iter_replace_file_ids_empty() {
+ let json = json!({});
+ let metadata: HoodieCommitMetadata =
serde_json::from_value(json).unwrap();
+ let count = metadata.iter_replace_file_ids().count();
+ assert_eq!(count, 0);
+ }
+}
diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs
index e7ae3b1..1385fbb 100644
--- a/crates/core/src/metadata/mod.rs
+++ b/crates/core/src/metadata/mod.rs
@@ -16,7 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
+pub mod commit;
pub mod meta_field;
+pub mod replace_commit;
pub const HUDI_METADATA_DIR: &str = ".hoodie";
pub const DELTALAKE_METADATA_DIR: &str = "_delta_log";
diff --git a/crates/core/src/metadata/replace_commit.rs
b/crates/core/src/metadata/replace_commit.rs
new file mode 100644
index 0000000..b1c7812
--- /dev/null
+++ b/crates/core/src/metadata/replace_commit.rs
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::error::CoreError;
+use crate::metadata::commit::HoodieWriteStat;
+use crate::Result;
+use apache_avro_derive::AvroSchema as DeriveAvroSchema;
+use serde::{Deserialize, Serialize};
+use serde_json::{Map, Value};
+use std::collections::HashMap;
+
+/// Represents the metadata for a Hudi Replace Commit
+///
+/// This is modeled from HoodieReplaceCommitMetadata.avsc.
+#[derive(Debug, Clone, Serialize, Deserialize, DeriveAvroSchema)]
+#[serde(rename_all = "camelCase")]
+#[avro(namespace = "org.apache.hudi.avro.model")]
+pub struct HoodieReplaceCommitMetadata {
+ // version: ["int","null"] with default 1 in Avro; we model as Option<i32>
+ pub version: Option<i32>,
+ #[avro(rename = "operationType")]
+ pub operation_type: Option<String>,
+ #[avro(rename = "partitionToWriteStats")]
+ pub partition_to_write_stats: Option<HashMap<String,
Vec<HoodieWriteStat>>>,
+ pub compacted: Option<bool>,
+ #[avro(rename = "extraMetadata")]
+ pub extra_metadata: Option<HashMap<String, String>>,
+ #[avro(rename = "partitionToReplaceFileIds")]
+ pub partition_to_replace_file_ids: Option<HashMap<String, Vec<String>>>,
+}
+
+impl HoodieReplaceCommitMetadata {
+ /// Parse replace commit metadata from a serde_json Map
+ pub fn from_json_map(map: &Map<String, Value>) -> Result<Self> {
+ serde_json::from_value(Value::Object(map.clone())).map_err(|e| {
+ CoreError::CommitMetadata(format!("Failed to parse commit
metadata: {}", e))
+ })
+ }
+
+ /// Parse replace commit metadata from JSON bytes
+ pub fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
+ serde_json::from_slice(bytes).map_err(|e| {
+ CoreError::CommitMetadata(format!("Failed to parse commit
metadata: {}", e))
+ })
+ }
+
+ /// Iterate over all replace file IDs across all partitions
+ pub fn iter_replace_file_ids(&self) -> impl Iterator<Item = (&String,
&String)> {
+ self.partition_to_replace_file_ids
+ .as_ref()
+ .into_iter()
+ .flat_map(|replace_ids| {
+ replace_ids.iter().flat_map(|(partition, file_ids)| {
+ file_ids.iter().map(move |file_id| (partition, file_id))
+ })
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use serde_json::json;
+
+ #[test]
+ fn test_parse_replace_commit() {
+ let json = json!({
+ "partitionToReplaceFileIds": {
+ "30": ["a-0"],
+ "20": ["b-0", "b-1"],
+ "": ["c-0"]
+ },
+ "extraMetadata": {"k":"v"},
+ "version": 1,
+ "operationType": "REPLACE_COMMIT"
+ });
+
+ let metadata: HoodieReplaceCommitMetadata =
serde_json::from_value(json).unwrap();
+ let ids: Vec<(&String, &String)> =
metadata.iter_replace_file_ids().collect();
+ assert_eq!(ids.len(), 4);
+ }
+
+ #[test]
+ fn test_from_json_bytes() {
+ let json_str = r#"{
+ "partitionToReplaceFileIds": {
+ "30": ["a-0"],
+ "20": ["b-0"]
+ },
+ "version": 1,
+ "operationType": "REPLACE_COMMIT"
+ }"#;
+
+ let metadata =
HoodieReplaceCommitMetadata::from_json_bytes(json_str.as_bytes()).unwrap();
+ assert_eq!(metadata.version, Some(1));
+ assert_eq!(metadata.operation_type,
Some("REPLACE_COMMIT".to_string()));
+ }
+
+ #[test]
+ fn test_from_json_bytes_invalid() {
+ let invalid_json = b"invalid json";
+ let result =
HoodieReplaceCommitMetadata::from_json_bytes(invalid_json);
+ assert!(result.is_err());
+ assert!(matches!(result, Err(CoreError::CommitMetadata(_))));
+ }
+
+ #[test]
+ fn test_iter_replace_file_ids_empty() {
+ let json = json!({});
+ let metadata: HoodieReplaceCommitMetadata =
serde_json::from_value(json).unwrap();
+ let count = metadata.iter_replace_file_ids().count();
+ assert_eq!(count, 0);
+ }
+}
diff --git a/crates/core/src/schema/resolver.rs
b/crates/core/src/schema/resolver.rs
index 803f8c6..e18ba1f 100644
--- a/crates/core/src/schema/resolver.rs
+++ b/crates/core/src/schema/resolver.rs
@@ -19,6 +19,7 @@
use crate::avro_to_arrow::to_arrow_schema;
use crate::config::table::HudiTableConfig;
use crate::error::{CoreError, Result};
+use crate::metadata::commit::HoodieCommitMetadata;
use crate::schema::prepend_meta_fields;
use crate::storage::Storage;
use crate::table::Table;
@@ -121,55 +122,40 @@ async fn resolve_schema_from_base_file(
commit_metadata: &Map<String, Value>,
storage: Arc<Storage>,
) -> Result<Schema> {
- let first_partition = commit_metadata
- .get("partitionToWriteStats")
- .and_then(|v| v.as_object());
+ let metadata = HoodieCommitMetadata::from_json_map(commit_metadata)?;
- let partition_path = first_partition
- .and_then(|obj| obj.keys().next())
- .ok_or_else(|| {
+ // Get the first write stat from any partition
+ let (partition, first_stat) =
metadata.iter_write_stats().next().ok_or_else(|| {
+ CoreError::CommitMetadata(
+ "Failed to resolve the latest schema: no write status in commit
metadata".to_string(),
+ )
+ })?;
+
+ // Try to get the base file path from either 'path' or 'baseFile' field
+ if let Some(path) = &first_stat.path {
+ if path.ends_with(".parquet") {
+ return Ok(storage.get_parquet_file_schema(path).await?);
+ }
+ }
+
+ // Handle deltacommit case with baseFile
+ if let Some(base_file) = &first_stat.base_file {
+ let parquet_file_path_buf = PathBuf::from_str(partition)
+ .map_err(|e| {
+ CoreError::CommitMetadata(format!("Failed to resolve the
latest schema: {}", e))
+ })?
+ .join(base_file);
+ let path = parquet_file_path_buf.to_str().ok_or_else(|| {
CoreError::CommitMetadata(
- "Failed to resolve the latest schema: no write status in
commit metadata"
- .to_string(),
+ "Failed to resolve the latest schema: invalid file
path".to_string(),
)
})?;
-
- let first_value = first_partition
- .and_then(|obj| obj.values().next())
- .and_then(|value| value.as_array())
- .and_then(|arr| arr.first());
-
- let base_file_path = first_value.and_then(|v| v["path"].as_str());
- match base_file_path {
- Some(path) if path.ends_with(".parquet") => {
- Ok(storage.get_parquet_file_schema(path).await?)
- }
- Some(_) => {
- // deltacommit case
- // TODO: properly parse deltacommit structure
- let base_file = first_value
- .and_then(|v| v["baseFile"].as_str())
- .ok_or_else(|| {
- CoreError::CommitMetadata(
- "Failed to resolve the latest schema: no file path
found".to_string(),
- )
- })?;
- let parquet_file_path_buf = PathBuf::from_str(partition_path)
- .map_err(|e| {
- CoreError::CommitMetadata(format!("Failed to resolve the
latest schema: {}", e))
- })?
- .join(base_file);
- let path = parquet_file_path_buf.to_str().ok_or_else(|| {
- CoreError::CommitMetadata(
- "Failed to resolve the latest schema: invalid file
path".to_string(),
- )
- })?;
- Ok(storage.get_parquet_file_schema(path).await?)
- }
- None => Err(CoreError::CommitMetadata(
- "Failed to resolve the latest schema: no file path
found".to_string(),
- )),
+ return Ok(storage.get_parquet_file_schema(path).await?);
}
+
+ Err(CoreError::CommitMetadata(
+ "Failed to resolve the latest schema: no file path found".to_string(),
+ ))
}
fn sanitize_avro_schema_str(avro_schema_str: &str) -> String {
@@ -196,3 +182,89 @@ fn extract_avro_schema_from_commit_metadata(
.map(|s| s.to_string())
})
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use serde_json::json;
+
+ #[test]
+ fn test_resolve_avro_schema_from_commit_metadata_with_schema() {
+ let metadata = json!({
+ "extraMetadata": {
+ "schema":
r#"{"type":"record","name":"TestRecord","fields":[{"name":"id","type":"int"}]}"#
+ }
+ })
+ .as_object()
+ .unwrap()
+ .clone();
+
+ let result = resolve_avro_schema_from_commit_metadata(&metadata);
+ assert!(result.is_ok());
+ let schema = result.unwrap();
+ assert!(schema.contains("TestRecord"));
+ }
+
+ #[test]
+ fn test_resolve_avro_schema_from_commit_metadata_empty() {
+ let metadata = Map::new();
+ let result = resolve_avro_schema_from_commit_metadata(&metadata);
+ assert!(result.is_err());
+ assert!(matches!(result, Err(CoreError::CommitMetadata(_))));
+ }
+
+ #[test]
+ fn test_resolve_avro_schema_from_commit_metadata_no_schema() {
+ let metadata = json!({
+ "extraMetadata": {
+ "other": "value"
+ }
+ })
+ .as_object()
+ .unwrap()
+ .clone();
+
+ let result = resolve_avro_schema_from_commit_metadata(&metadata);
+ assert!(result.is_err());
+ assert!(matches!(result, Err(CoreError::SchemaNotFound(_))));
+ }
+
+ #[test]
+ fn test_sanitize_avro_schema_str() {
+ let schema_with_escape = r#"test\:schema"#;
+ let sanitized = sanitize_avro_schema_str(schema_with_escape);
+ assert_eq!(sanitized, "test:schema");
+
+ let schema_with_whitespace = " test schema ";
+ let sanitized = sanitize_avro_schema_str(schema_with_whitespace);
+ assert_eq!(sanitized, "test schema");
+ }
+
+ #[test]
+ fn test_extract_avro_schema_from_commit_metadata() {
+ let metadata = json!({
+ "extraMetadata": {
+ "schema": "test_schema"
+ }
+ })
+ .as_object()
+ .unwrap()
+ .clone();
+
+ let schema = extract_avro_schema_from_commit_metadata(&metadata);
+ assert_eq!(schema, Some("test_schema".to_string()));
+ }
+
+ #[test]
+ fn test_extract_avro_schema_from_commit_metadata_none() {
+ let metadata = json!({
+ "other": "value"
+ })
+ .as_object()
+ .unwrap()
+ .clone();
+
+ let schema = extract_avro_schema_from_commit_metadata(&metadata);
+ assert_eq!(schema, None);
+ }
+}