This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 4d59279 fix: handle schema resolution for empty commit (#359)
4d59279 is described below
commit 4d592794fd7aa7ca3ed16005f8948ec95ab45102
Author: Shiyan Xu <[email protected]>
AuthorDate: Thu Jun 26 16:31:24 2025 -0500
fix: handle schema resolution for empty commit (#359)
Handle commit metadata created by empty commit
---
crates/core/src/metadata/meta_field.rs | 21 ++++++
crates/core/src/schema/mod.rs | 45 +++++++++++++
crates/core/src/table/mod.rs | 5 +-
crates/core/src/timeline/mod.rs | 75 +++++++++++++++++++---
.../.hoodie/20240402144910683.commit | 9 +++
.../.hoodie/20240402144910683.commit.requested | 0
.../.hoodie/20240402144910683.inflight | 0
.../.hoodie/hoodie.properties | 0
8 files changed, 142 insertions(+), 13 deletions(-)
diff --git a/crates/core/src/metadata/meta_field.rs
b/crates/core/src/metadata/meta_field.rs
index 7340ebc..bfccfd5 100644
--- a/crates/core/src/metadata/meta_field.rs
+++ b/crates/core/src/metadata/meta_field.rs
@@ -100,6 +100,27 @@ impl MetaField {
pub fn schema_with_operation() -> SchemaRef {
SCHEMA_WITH_OPERATION.clone()
}
+
+ pub fn field_names() -> Vec<&'static str> {
+ vec![
+ MetaField::CommitTime.as_ref(),
+ MetaField::CommitSeqno.as_ref(),
+ MetaField::RecordKey.as_ref(),
+ MetaField::PartitionPath.as_ref(),
+ MetaField::FileName.as_ref(),
+ ]
+ }
+
+ pub fn field_names_with_operation() -> Vec<&'static str> {
+ vec![
+ MetaField::CommitTime.as_ref(),
+ MetaField::CommitSeqno.as_ref(),
+ MetaField::RecordKey.as_ref(),
+ MetaField::PartitionPath.as_ref(),
+ MetaField::FileName.as_ref(),
+ MetaField::Operation.as_ref(),
+ ]
+ }
}
#[cfg(test)]
diff --git a/crates/core/src/schema/mod.rs b/crates/core/src/schema/mod.rs
index bffa429..15c052e 100644
--- a/crates/core/src/schema/mod.rs
+++ b/crates/core/src/schema/mod.rs
@@ -16,4 +16,49 @@
* specific language governing permissions and limitations
* under the License.
*/
+use crate::error::{CoreError, Result};
+use crate::metadata::meta_field::MetaField;
+use arrow_schema::{Schema, SchemaRef};
+
pub mod delete;
+
+pub fn prepend_meta_fields(schema: SchemaRef) -> Result<Schema> {
+ let meta_field_schema = MetaField::schema();
+ Schema::try_merge([meta_field_schema.as_ref().clone(),
schema.as_ref().clone()])
+ .map_err(CoreError::ArrowError)
+}
+
+pub fn prepend_meta_fields_with_operation(schema: SchemaRef) -> Result<Schema>
{
+ let meta_field_schema = MetaField::schema_with_operation();
+ Schema::try_merge([meta_field_schema.as_ref().clone(),
schema.as_ref().clone()])
+ .map_err(CoreError::ArrowError)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow_schema::{DataType, Field};
+ use std::sync::Arc;
+
+ #[test]
+ fn test_prepend_meta_fields() {
+ let schema = Schema::new(vec![Field::new("field1", DataType::Int32,
false)]);
+ let new_schema = prepend_meta_fields(Arc::new(schema)).unwrap();
+ assert_eq!(new_schema.fields().len(), 6);
+
+ let field_names: Vec<_> = new_schema.fields().iter().map(|f|
f.name()).collect();
+ assert_eq!(field_names[..5], MetaField::field_names());
+ assert_eq!(field_names[5], "field1");
+ }
+
+ #[test]
+ fn test_prepend_meta_fields_with_operation() {
+ let schema = Schema::new(vec![Field::new("field1", DataType::Int32,
false)]);
+ let new_schema =
prepend_meta_fields_with_operation(Arc::new(schema)).unwrap();
+ assert_eq!(new_schema.fields().len(), 7);
+
+ let field_names: Vec<_> = new_schema.fields().iter().map(|f|
f.name()).collect();
+ assert_eq!(field_names[..6], MetaField::field_names_with_operation());
+ assert_eq!(field_names[6], "field1");
+ }
+}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index cf7bda9..949ca80 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -817,13 +817,10 @@ mod tests {
"timestampField",
"binaryField",
"arrayField",
- "array",
+ "element",
"arr_struct_f1",
"arr_struct_f2",
"mapField",
- "key_value",
- "key",
- "value",
"map_field_value_struct_f1",
"map_field_value_struct_f2",
"structField",
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 5a9bbdb..b76747b 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -20,16 +20,18 @@ pub mod instant;
pub(crate) mod selector;
pub(crate) mod util;
+use crate::avro_to_arrow::to_arrow_schema;
use crate::config::HudiConfigs;
use crate::error::CoreError;
use crate::file_group::builder::{build_file_groups,
build_replaced_file_groups, FileGroupMerger};
use crate::file_group::FileGroup;
use crate::metadata::HUDI_METADATA_DIR;
+use crate::schema::prepend_meta_fields;
use crate::storage::Storage;
use crate::timeline::instant::Action;
use crate::timeline::selector::TimelineSelector;
use crate::Result;
-use arrow_schema::Schema;
+use arrow_schema::{Schema, SchemaRef};
use instant::Instant;
use log::debug;
use serde_json::{Map, Value};
@@ -236,9 +238,9 @@ impl Timeline {
)
}
- /// Get the latest Avro schema string from the [Timeline].
- pub async fn get_latest_avro_schema(&self) -> Result<String> {
- let commit_metadata = self.get_latest_commit_metadata().await?;
+ fn extract_avro_schema_from_commit_metadata(
+ commit_metadata: &Map<String, Value>,
+ ) -> Option<String> {
commit_metadata
.get("extraMetadata")
.and_then(|v| v.as_object())
@@ -247,17 +249,34 @@ impl Timeline {
.and_then(|v| v.as_str())
.map(|s| s.to_string())
})
- .ok_or_else(|| {
- CoreError::CommitMetadata(
- "Failed to resolve the latest schema: no schema
found".to_string(),
- )
- })
+ }
+
+ /// Get the latest Avro schema string from the [Timeline].
+ pub async fn get_latest_avro_schema(&self) -> Result<String> {
+ let commit_metadata = self.get_latest_commit_metadata().await?;
+
Self::extract_avro_schema_from_commit_metadata(&commit_metadata).ok_or_else(|| {
+ CoreError::CommitMetadata(
+ "Failed to resolve the latest schema: no schema
found".to_string(),
+ )
+ })
}
/// Get the latest [arrow_schema::Schema] from the [Timeline].
pub async fn get_latest_schema(&self) -> Result<Schema> {
let commit_metadata = self.get_latest_commit_metadata().await?;
+ if let Some(avro_schema) =
Self::extract_avro_schema_from_commit_metadata(&commit_metadata)
+ {
+ let avro_schema =
apache_avro::schema::Schema::parse_str(&avro_schema)?;
+ let arrow_schema = to_arrow_schema(&avro_schema).map_err(|e| {
+ CoreError::CommitMetadata(format!(
+ "Failed to convert the latest Avro schema: {}",
+ e
+ ))
+ })?;
+ return prepend_meta_fields(SchemaRef::new(arrow_schema));
+ }
+
let first_partition = commit_metadata
.get("partitionToWriteStats")
.and_then(|v| v.as_object());
@@ -376,6 +395,7 @@ mod tests {
use hudi_test::SampleTable;
use crate::config::table::HudiTableConfig;
+ use crate::metadata::meta_field::MetaField;
async fn create_test_timeline(base_url: Url) -> Timeline {
Timeline::new_from_storage(
@@ -451,4 +471,41 @@ mod tests {
assert!(matches!(err, CoreError::Timeline(_)));
assert!(err.to_string().contains("Failed to get commit metadata"));
}
+
+ #[tokio::test]
+ async fn get_avro_schema() {
+ let base_url = Url::from_file_path(
+
canonicalize(Path::new("tests/data/timeline/commits_with_valid_schema")).unwrap(),
+ )
+ .unwrap();
+ let timeline = create_test_timeline(base_url).await;
+
+ let avro_schema = timeline.get_latest_avro_schema().await;
+ assert!(avro_schema.is_ok());
+ assert_eq!(
+ avro_schema.unwrap(),
+
"{\"type\":\"record\",\"name\":\"v6_trips_record\",\"namespace\":\"hoodie.v6_trips\",\"fields\":[{\"name\":\"ts\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"uuid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"rider\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"driver\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"fare\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"stri
[...]
+ )
+ }
+
+ #[tokio::test]
+ async fn get_arrow_schema() {
+ let base_url = Url::from_file_path(
+
canonicalize(Path::new("tests/data/timeline/commits_with_valid_schema")).unwrap(),
+ )
+ .unwrap();
+ let timeline = create_test_timeline(base_url).await;
+
+ let arrow_schema = timeline.get_latest_schema().await;
+ assert!(arrow_schema.is_ok());
+ let arrow_schema = arrow_schema.unwrap();
+ let fields = arrow_schema
+ .fields
+ .iter()
+ .map(|f| f.name())
+ .collect::<Vec<_>>();
+ let mut expected_fields = MetaField::field_names();
+ expected_fields.extend_from_slice(&["ts", "uuid", "rider", "driver",
"fare", "city"]);
+ assert_eq!(fields, expected_fields)
+ }
}
diff --git
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit
b/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit
new file mode 100644
index 0000000..3f44ba4
--- /dev/null
+++
b/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit
@@ -0,0 +1,9 @@
+{
+ "partitionToWriteStats" : { },
+ "compacted" : false,
+ "extraMetadata" : {
+ "schema" :
"{\"type\":\"record\",\"name\":\"v6_trips_record\",\"namespace\":\"hoodie.v6_trips\",\"fields\":[{\"name\":\"ts\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"uuid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"rider\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"driver\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"fare\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"s
[...]
+ "deltastreamer.checkpoint.key" : "20250428232324387"
+ },
+ "operationType" : "BULK_INSERT"
+}
diff --git
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
b/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
new file mode 100644
index 0000000..e69de29
diff --git
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
b/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
new file mode 100644
index 0000000..e69de29
diff --git
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
b/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
new file mode 100644
index 0000000..e69de29