This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d59279  fix: handle schema resolution for empty commit (#359)
4d59279 is described below

commit 4d592794fd7aa7ca3ed16005f8948ec95ab45102
Author: Shiyan Xu <[email protected]>
AuthorDate: Thu Jun 26 16:31:24 2025 -0500

    fix: handle schema resolution for empty commit (#359)
    
    Handle commit metadata created by empty commit
---
 crates/core/src/metadata/meta_field.rs             | 21 ++++++
 crates/core/src/schema/mod.rs                      | 45 +++++++++++++
 crates/core/src/table/mod.rs                       |  5 +-
 crates/core/src/timeline/mod.rs                    | 75 +++++++++++++++++++---
 .../.hoodie/20240402144910683.commit               |  9 +++
 .../.hoodie/20240402144910683.commit.requested     |  0
 .../.hoodie/20240402144910683.inflight             |  0
 .../.hoodie/hoodie.properties                      |  0
 8 files changed, 142 insertions(+), 13 deletions(-)

diff --git a/crates/core/src/metadata/meta_field.rs 
b/crates/core/src/metadata/meta_field.rs
index 7340ebc..bfccfd5 100644
--- a/crates/core/src/metadata/meta_field.rs
+++ b/crates/core/src/metadata/meta_field.rs
@@ -100,6 +100,27 @@ impl MetaField {
     pub fn schema_with_operation() -> SchemaRef {
         SCHEMA_WITH_OPERATION.clone()
     }
+
+    pub fn field_names() -> Vec<&'static str> {
+        vec![
+            MetaField::CommitTime.as_ref(),
+            MetaField::CommitSeqno.as_ref(),
+            MetaField::RecordKey.as_ref(),
+            MetaField::PartitionPath.as_ref(),
+            MetaField::FileName.as_ref(),
+        ]
+    }
+
+    pub fn field_names_with_operation() -> Vec<&'static str> {
+        vec![
+            MetaField::CommitTime.as_ref(),
+            MetaField::CommitSeqno.as_ref(),
+            MetaField::RecordKey.as_ref(),
+            MetaField::PartitionPath.as_ref(),
+            MetaField::FileName.as_ref(),
+            MetaField::Operation.as_ref(),
+        ]
+    }
 }
 
 #[cfg(test)]
diff --git a/crates/core/src/schema/mod.rs b/crates/core/src/schema/mod.rs
index bffa429..15c052e 100644
--- a/crates/core/src/schema/mod.rs
+++ b/crates/core/src/schema/mod.rs
@@ -16,4 +16,49 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use crate::error::{CoreError, Result};
+use crate::metadata::meta_field::MetaField;
+use arrow_schema::{Schema, SchemaRef};
+
 pub mod delete;
+
+pub fn prepend_meta_fields(schema: SchemaRef) -> Result<Schema> {
+    let meta_field_schema = MetaField::schema();
+    Schema::try_merge([meta_field_schema.as_ref().clone(), 
schema.as_ref().clone()])
+        .map_err(CoreError::ArrowError)
+}
+
+pub fn prepend_meta_fields_with_operation(schema: SchemaRef) -> Result<Schema> 
{
+    let meta_field_schema = MetaField::schema_with_operation();
+    Schema::try_merge([meta_field_schema.as_ref().clone(), 
schema.as_ref().clone()])
+        .map_err(CoreError::ArrowError)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow_schema::{DataType, Field};
+    use std::sync::Arc;
+
+    #[test]
+    fn test_prepend_meta_fields() {
+        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, 
false)]);
+        let new_schema = prepend_meta_fields(Arc::new(schema)).unwrap();
+        assert_eq!(new_schema.fields().len(), 6);
+
+        let field_names: Vec<_> = new_schema.fields().iter().map(|f| 
f.name()).collect();
+        assert_eq!(field_names[..5], MetaField::field_names());
+        assert_eq!(field_names[5], "field1");
+    }
+
+    #[test]
+    fn test_prepend_meta_fields_with_operation() {
+        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, 
false)]);
+        let new_schema = 
prepend_meta_fields_with_operation(Arc::new(schema)).unwrap();
+        assert_eq!(new_schema.fields().len(), 7);
+
+        let field_names: Vec<_> = new_schema.fields().iter().map(|f| 
f.name()).collect();
+        assert_eq!(field_names[..6], MetaField::field_names_with_operation());
+        assert_eq!(field_names[6], "field1");
+    }
+}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index cf7bda9..949ca80 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -817,13 +817,10 @@ mod tests {
                 "timestampField",
                 "binaryField",
                 "arrayField",
-                "array",
+                "element",
                 "arr_struct_f1",
                 "arr_struct_f2",
                 "mapField",
-                "key_value",
-                "key",
-                "value",
                 "map_field_value_struct_f1",
                 "map_field_value_struct_f2",
                 "structField",
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 5a9bbdb..b76747b 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -20,16 +20,18 @@ pub mod instant;
 pub(crate) mod selector;
 pub(crate) mod util;
 
+use crate::avro_to_arrow::to_arrow_schema;
 use crate::config::HudiConfigs;
 use crate::error::CoreError;
 use crate::file_group::builder::{build_file_groups, 
build_replaced_file_groups, FileGroupMerger};
 use crate::file_group::FileGroup;
 use crate::metadata::HUDI_METADATA_DIR;
+use crate::schema::prepend_meta_fields;
 use crate::storage::Storage;
 use crate::timeline::instant::Action;
 use crate::timeline::selector::TimelineSelector;
 use crate::Result;
-use arrow_schema::Schema;
+use arrow_schema::{Schema, SchemaRef};
 use instant::Instant;
 use log::debug;
 use serde_json::{Map, Value};
@@ -236,9 +238,9 @@ impl Timeline {
         )
     }
 
-    /// Get the latest Avro schema string from the [Timeline].
-    pub async fn get_latest_avro_schema(&self) -> Result<String> {
-        let commit_metadata = self.get_latest_commit_metadata().await?;
+    fn extract_avro_schema_from_commit_metadata(
+        commit_metadata: &Map<String, Value>,
+    ) -> Option<String> {
         commit_metadata
             .get("extraMetadata")
             .and_then(|v| v.as_object())
@@ -247,17 +249,34 @@ impl Timeline {
                     .and_then(|v| v.as_str())
                     .map(|s| s.to_string())
             })
-            .ok_or_else(|| {
-                CoreError::CommitMetadata(
-                    "Failed to resolve the latest schema: no schema 
found".to_string(),
-                )
-            })
+    }
+
+    /// Get the latest Avro schema string from the [Timeline].
+    pub async fn get_latest_avro_schema(&self) -> Result<String> {
+        let commit_metadata = self.get_latest_commit_metadata().await?;
+        
Self::extract_avro_schema_from_commit_metadata(&commit_metadata).ok_or_else(|| {
+            CoreError::CommitMetadata(
+                "Failed to resolve the latest schema: no schema 
found".to_string(),
+            )
+        })
     }
 
     /// Get the latest [arrow_schema::Schema] from the [Timeline].
     pub async fn get_latest_schema(&self) -> Result<Schema> {
         let commit_metadata = self.get_latest_commit_metadata().await?;
 
+        if let Some(avro_schema) = 
Self::extract_avro_schema_from_commit_metadata(&commit_metadata)
+        {
+            let avro_schema = 
apache_avro::schema::Schema::parse_str(&avro_schema)?;
+            let arrow_schema = to_arrow_schema(&avro_schema).map_err(|e| {
+                CoreError::CommitMetadata(format!(
+                    "Failed to convert the latest Avro schema: {}",
+                    e
+                ))
+            })?;
+            return prepend_meta_fields(SchemaRef::new(arrow_schema));
+        }
+
         let first_partition = commit_metadata
             .get("partitionToWriteStats")
             .and_then(|v| v.as_object());
@@ -376,6 +395,7 @@ mod tests {
     use hudi_test::SampleTable;
 
     use crate::config::table::HudiTableConfig;
+    use crate::metadata::meta_field::MetaField;
 
     async fn create_test_timeline(base_url: Url) -> Timeline {
         Timeline::new_from_storage(
@@ -451,4 +471,41 @@ mod tests {
         assert!(matches!(err, CoreError::Timeline(_)));
         assert!(err.to_string().contains("Failed to get commit metadata"));
     }
+
+    #[tokio::test]
+    async fn get_avro_schema() {
+        let base_url = Url::from_file_path(
+            
canonicalize(Path::new("tests/data/timeline/commits_with_valid_schema")).unwrap(),
+        )
+        .unwrap();
+        let timeline = create_test_timeline(base_url).await;
+
+        let avro_schema = timeline.get_latest_avro_schema().await;
+        assert!(avro_schema.is_ok());
+        assert_eq!(
+            avro_schema.unwrap(),
+            
"{\"type\":\"record\",\"name\":\"v6_trips_record\",\"namespace\":\"hoodie.v6_trips\",\"fields\":[{\"name\":\"ts\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"uuid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"rider\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"driver\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"fare\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"stri
 [...]
+        )
+    }
+
+    #[tokio::test]
+    async fn get_arrow_schema() {
+        let base_url = Url::from_file_path(
+            
canonicalize(Path::new("tests/data/timeline/commits_with_valid_schema")).unwrap(),
+        )
+        .unwrap();
+        let timeline = create_test_timeline(base_url).await;
+
+        let arrow_schema = timeline.get_latest_schema().await;
+        assert!(arrow_schema.is_ok());
+        let arrow_schema = arrow_schema.unwrap();
+        let fields = arrow_schema
+            .fields
+            .iter()
+            .map(|f| f.name())
+            .collect::<Vec<_>>();
+        let mut expected_fields = MetaField::field_names();
+        expected_fields.extend_from_slice(&["ts", "uuid", "rider", "driver", 
"fare", "city"]);
+        assert_eq!(fields, expected_fields)
+    }
 }
diff --git 
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit
 
b/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit
new file mode 100644
index 0000000..3f44ba4
--- /dev/null
+++ 
b/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit
@@ -0,0 +1,9 @@
+{
+  "partitionToWriteStats" : { },
+  "compacted" : false,
+  "extraMetadata" : {
+    "schema" : 
"{\"type\":\"record\",\"name\":\"v6_trips_record\",\"namespace\":\"hoodie.v6_trips\",\"fields\":[{\"name\":\"ts\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"uuid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"rider\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"driver\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"fare\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"s
 [...]
+    "deltastreamer.checkpoint.key" : "20250428232324387"
+  },
+  "operationType" : "BULK_INSERT"
+}
diff --git 
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
 
b/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
new file mode 100644
index 0000000..e69de29
diff --git 
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
 
b/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
new file mode 100644
index 0000000..e69de29
diff --git 
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
 
b/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
new file mode 100644
index 0000000..e69de29

Reply via email to