This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new ce08d9d  refactor: improve schema resolution flow (#364)
ce08d9d is described below

commit ce08d9d6546f251b044c1cd6f574339236fabcdb
Author: Shiyan Xu <[email protected]>
AuthorDate: Sat Jun 28 00:43:42 2025 -0500

    refactor: improve schema resolution flow (#364)
    
    Added `crates/core/src/schema/resolver.rs` to keep all schema resolution 
functions.
    
    For empty tables with no commits, get the `hoodie.table.create.schema` from 
the table props if available.
---
 crates/core/src/config/table.rs                    |   5 +
 crates/core/src/error.rs                           |   6 +
 crates/core/src/schema/mod.rs                      |  21 +-
 crates/core/src/schema/resolver.rs                 | 198 ++++++++++++++++++
 crates/core/src/table/builder.rs                   |   5 +-
 crates/core/src/table/mod.rs                       | 144 ++++++++-----
 crates/core/src/timeline/mod.rs                    | 222 ++++++++++-----------
 .../.hoodie/hoodie.properties                      |  14 ++
 .../.hoodie/20250628002223107.commit               |  94 +++++++++
 .../.hoodie/20250628002223107.commit.requested}    |   0
 .../.hoodie/20250628002223107.inflight             |  82 ++++++++
 ...15355d9b0ed-0_2-13-37_20250628002223107.parquet | Bin 0 -> 436045 bytes
 ...eedbe6e09ed-0_0-13-35_20250628002223107.parquet | Bin 0 -> 436228 bytes
 ...7e75252c69f-0_1-13-36_20250628002223107.parquet | Bin 0 -> 436064 bytes
 .../.hoodie/20250331030642808.deltacommit          |  94 +++++++++
 .../.hoodie/20250331030642808.deltacommit.inflight |  82 ++++++++
 .../20250331030642808.deltacommit.requested}       |   0
 .../.hoodie/20250331030645735.deltacommit          |  43 ++++
 .../.hoodie/20250331030645735.deltacommit.inflight |  56 ++++++
 .../20250331030645735.deltacommit.requested}       |   0
 ...-5ff632f79224-0_20250331030642808.log.1_0-26-85 | Bin 0 -> 1148 bytes
 ...ff632f79224-0_0-13-60_20250331030642808.parquet | Bin 0 -> 436406 bytes
 .../.hoodie/20240402144910683.commit               |   1 +
 .../.hoodie/20240402144910683.commit.requested     |   0
 .../.hoodie/20240402144910683.inflight             |   0
 .../.hoodie/hoodie.properties                      |   0
 .../.hoodie/20240402144910683.commit               |   6 +
 .../.hoodie/20240402144910683.commit.requested     |   0
 .../.hoodie/20240402144910683.inflight             |   0
 .../.hoodie/hoodie.properties                      |   0
 .../.hoodie/20240402144910683.commit               |   0
 .../.hoodie/20240402144910683.commit.requested     |   0
 .../.hoodie/20240402144910683.inflight             |   0
 .../.hoodie/hoodie.properties                      |   0
 crates/datafusion/src/lib.rs                       |   9 +-
 crates/test/src/util.rs                            |  32 +++
 36 files changed, 931 insertions(+), 183 deletions(-)

diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index 149938e..5264236 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -56,6 +56,9 @@ pub enum HudiTableConfig {
     /// It is added as the last entry in hoodie.properties and then used to 
validate while reading table config.
     Checksum,
 
+    /// Avro schema used when creating the table.
+    CreateSchema,
+
     /// Database name that will be used for incremental query.
     /// If different databases have the same table name during incremental 
query,
     /// we can set it to limit the table name under a specific database
@@ -122,6 +125,7 @@ impl AsRef<str> for HudiTableConfig {
             Self::BaseFileFormat => "hoodie.table.base.file.format",
             Self::BasePath => "hoodie.base.path",
             Self::Checksum => "hoodie.table.checksum",
+            Self::CreateSchema => "hoodie.table.create.schema",
             Self::DatabaseName => "hoodie.database.name",
             Self::DropsPartitionFields => 
"hoodie.datasource.write.drop.partition.columns",
             Self::IsHiveStylePartitioning => 
"hoodie.datasource.write.hive_style_partitioning",
@@ -186,6 +190,7 @@ impl ConfigParser for HudiTableConfig {
                     isize::from_str(v).map_err(|e| ParseInt(self.key(), 
v.to_string(), e))
                 })
                 .map(HudiConfigValue::Integer),
+            Self::CreateSchema => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
             Self::DatabaseName => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
             Self::DropsPartitionFields => get_result
                 .and_then(|v| {
diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs
index 9d4f25b..2cff435 100644
--- a/crates/core/src/error.rs
+++ b/crates/core/src/error.rs
@@ -42,6 +42,9 @@ pub enum CoreError {
     #[error("Data type error: {0}")]
     Schema(String),
 
+    #[error("{0}")]
+    SchemaNotFound(String),
+
     #[error("File group error: {0}")]
     FileGroup(String),
 
@@ -72,6 +75,9 @@ pub enum CoreError {
     #[error("Timeline error: {0}")]
     Timeline(String),
 
+    #[error("Timeline has no completed commit.")]
+    TimelineNoCommit,
+
     #[error("{0}")]
     TimestampParsingError(String),
 
diff --git a/crates/core/src/schema/mod.rs b/crates/core/src/schema/mod.rs
index 15c052e..52ce19a 100644
--- a/crates/core/src/schema/mod.rs
+++ b/crates/core/src/schema/mod.rs
@@ -21,6 +21,7 @@ use crate::metadata::meta_field::MetaField;
 use arrow_schema::{Schema, SchemaRef};
 
 pub mod delete;
+pub mod resolver;
 
 pub fn prepend_meta_fields(schema: SchemaRef) -> Result<Schema> {
     let meta_field_schema = MetaField::schema();
@@ -28,6 +29,7 @@ pub fn prepend_meta_fields(schema: SchemaRef) -> 
Result<Schema> {
         .map_err(CoreError::ArrowError)
 }
 
+// TODO use this when applicable, like some table config says there is an 
operation field
 pub fn prepend_meta_fields_with_operation(schema: SchemaRef) -> Result<Schema> 
{
     let meta_field_schema = MetaField::schema_with_operation();
     Schema::try_merge([meta_field_schema.as_ref().clone(), 
schema.as_ref().clone()])
@@ -38,27 +40,26 @@ pub fn prepend_meta_fields_with_operation(schema: 
SchemaRef) -> Result<Schema> {
 mod tests {
     use super::*;
     use arrow_schema::{DataType, Field};
+    use hudi_test::assert_arrow_field_names_eq;
     use std::sync::Arc;
 
     #[test]
     fn test_prepend_meta_fields() {
         let schema = Schema::new(vec![Field::new("field1", DataType::Int32, 
false)]);
         let new_schema = prepend_meta_fields(Arc::new(schema)).unwrap();
-        assert_eq!(new_schema.fields().len(), 6);
-
-        let field_names: Vec<_> = new_schema.fields().iter().map(|f| 
f.name()).collect();
-        assert_eq!(field_names[..5], MetaField::field_names());
-        assert_eq!(field_names[5], "field1");
+        assert_arrow_field_names_eq!(
+            new_schema,
+            [MetaField::field_names(), vec!["field1"]].concat()
+        )
     }
 
     #[test]
     fn test_prepend_meta_fields_with_operation() {
         let schema = Schema::new(vec![Field::new("field1", DataType::Int32, 
false)]);
         let new_schema = 
prepend_meta_fields_with_operation(Arc::new(schema)).unwrap();
-        assert_eq!(new_schema.fields().len(), 7);
-
-        let field_names: Vec<_> = new_schema.fields().iter().map(|f| 
f.name()).collect();
-        assert_eq!(field_names[..6], MetaField::field_names_with_operation());
-        assert_eq!(field_names[6], "field1");
+        assert_arrow_field_names_eq!(
+            new_schema,
+            [MetaField::field_names_with_operation(), vec!["field1"]].concat()
+        )
     }
 }
diff --git a/crates/core/src/schema/resolver.rs 
b/crates/core/src/schema/resolver.rs
new file mode 100644
index 0000000..7095849
--- /dev/null
+++ b/crates/core/src/schema/resolver.rs
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::avro_to_arrow::to_arrow_schema;
+use crate::config::table::HudiTableConfig;
+use crate::error::{CoreError, Result};
+use crate::schema::prepend_meta_fields;
+use crate::storage::Storage;
+use crate::table::Table;
+use apache_avro::schema::Schema as AvroSchema;
+use arrow_schema::{Schema, SchemaRef};
+use serde_json::{Map, Value};
+use std::path::PathBuf;
+use std::str::FromStr;
+use std::sync::Arc;
+
+/// Resolves the [`arrow_schema::Schema`] for a given Hudi table.
+///
+/// The resolution process follows these steps:
+/// - If the timeline has commit metadata, read the schema field from it.
+///   - If the commit metadata has no schema, read the schema from the base 
file pointed by the first entry in the write-status of the commit metadata.
+/// - If the timeline has no commit metadata, read 
[`HudiTableConfig::CreateSchema`] from `hoodie.properties`.
+pub async fn resolve_schema(table: &Table) -> Result<Schema> {
+    let timeline = table.get_timeline();
+    match timeline.get_latest_commit_metadata().await {
+        Ok(metadata) => {
+            resolve_schema_from_commit_metadata(&metadata, 
timeline.storage.clone()).await
+        }
+        Err(CoreError::TimelineNoCommit) => {
+            if let Some(create_schema) = 
table.hudi_configs.try_get(HudiTableConfig::CreateSchema) {
+                let avro_schema_str = create_schema.to::<String>();
+                let arrow_schema = 
arrow_schema_from_avro_schema_str(&avro_schema_str)?;
+                prepend_meta_fields(SchemaRef::new(arrow_schema))
+            } else {
+                Err(CoreError::SchemaNotFound(
+                    "No completed commit, and no create schema for the 
table.".to_string(),
+                ))
+            }
+        }
+        Err(e) => Err(e),
+    }
+}
+
+/// Resolves the [`apache_avro::schema::Schema`] as a [`String`] for a given 
Hudi table.
+///
+/// The resolution process follows these steps:
+/// - If the timeline has commit metadata, read the schema field from it.
+/// - If the timeline has no commit metadata, read 
[`HudiTableConfig::CreateSchema`] from `hoodie.properties`.
+///
+/// ### Note
+///
+/// - For resolving Avro schema, we don't read the schema from a base file 
like we do when resolving Arrow schema.
+/// - Avro schema does not contain [`MetaField`]s.
+pub async fn resolve_avro_schema(table: &Table) -> Result<String> {
+    let timeline = table.get_timeline();
+    match timeline.get_latest_commit_metadata().await {
+        Ok(metadata) => resolve_avro_schema_from_commit_metadata(&metadata),
+        Err(CoreError::TimelineNoCommit) => {
+            if let Some(create_schema) = 
table.hudi_configs.try_get(HudiTableConfig::CreateSchema) {
+                let create_schema = create_schema.to::<String>();
+                Ok(sanitize_avro_schema_str(&create_schema))
+            } else {
+                Err(CoreError::SchemaNotFound(
+                    "No completed commit, and no create schema for the 
table.".to_string(),
+                ))
+            }
+        }
+        Err(e) => Err(e),
+    }
+}
+
+pub(crate) async fn resolve_schema_from_commit_metadata(
+    commit_metadata: &Map<String, Value>,
+    storage: Arc<Storage>,
+) -> Result<Schema> {
+    let avro_schema_str = match 
resolve_avro_schema_from_commit_metadata(commit_metadata) {
+        Ok(s) => s,
+        Err(CoreError::SchemaNotFound(_)) => {
+            return resolve_schema_from_base_file(commit_metadata, 
storage).await
+        }
+        Err(e) => return Err(e),
+    };
+
+    let arrow_schema = arrow_schema_from_avro_schema_str(&avro_schema_str)?;
+    prepend_meta_fields(SchemaRef::new(arrow_schema))
+}
+
+pub(crate) fn resolve_avro_schema_from_commit_metadata(
+    commit_metadata: &Map<String, Value>,
+) -> Result<String> {
+    if commit_metadata.is_empty() {
+        return Err(CoreError::CommitMetadata(
+            "Commit metadata is empty.".to_string(),
+        ));
+    }
+
+    match extract_avro_schema_from_commit_metadata(commit_metadata) {
+        Some(schema) => Ok(schema),
+        None => Err(CoreError::SchemaNotFound(
+            "No schema found in the commit metadata.".to_string(),
+        )),
+    }
+}
+
+async fn resolve_schema_from_base_file(
+    commit_metadata: &Map<String, Value>,
+    storage: Arc<Storage>,
+) -> Result<Schema> {
+    let first_partition = commit_metadata
+        .get("partitionToWriteStats")
+        .and_then(|v| v.as_object());
+
+    let partition_path = first_partition
+        .and_then(|obj| obj.keys().next())
+        .ok_or_else(|| {
+            CoreError::CommitMetadata(
+                "Failed to resolve the latest schema: no write status in 
commit metadata"
+                    .to_string(),
+            )
+        })?;
+
+    let first_value = first_partition
+        .and_then(|obj| obj.values().next())
+        .and_then(|value| value.as_array())
+        .and_then(|arr| arr.first());
+
+    let base_file_path = first_value.and_then(|v| v["path"].as_str());
+    match base_file_path {
+        Some(path) if path.ends_with(".parquet") => {
+            Ok(storage.get_parquet_file_schema(path).await?)
+        }
+        Some(_) => {
+            // deltacommit case
+            // TODO: properly parse deltacommit structure
+            let base_file = first_value
+                .and_then(|v| v["baseFile"].as_str())
+                .ok_or_else(|| {
+                    CoreError::CommitMetadata(
+                        "Failed to resolve the latest schema: no file path 
found".to_string(),
+                    )
+                })?;
+            let parquet_file_path_buf = PathBuf::from_str(partition_path)
+                .map_err(|e| {
+                    CoreError::CommitMetadata(format!("Failed to resolve the 
latest schema: {}", e))
+                })?
+                .join(base_file);
+            let path = parquet_file_path_buf.to_str().ok_or_else(|| {
+                CoreError::CommitMetadata(
+                    "Failed to resolve the latest schema: invalid file 
path".to_string(),
+                )
+            })?;
+            Ok(storage.get_parquet_file_schema(path).await?)
+        }
+        None => Err(CoreError::CommitMetadata(
+            "Failed to resolve the latest schema: no file path 
found".to_string(),
+        )),
+    }
+}
+
+fn sanitize_avro_schema_str(avro_schema_str: &str) -> String {
+    avro_schema_str.trim().replace("\\:", ":")
+}
+
+fn arrow_schema_from_avro_schema_str(avro_schema_str: &str) -> Result<Schema> {
+    let s = sanitize_avro_schema_str(avro_schema_str);
+    let avro_schema = AvroSchema::parse_str(&s)
+        .map_err(|e| CoreError::Schema(format!("Failed to parse Avro schema: 
{}", e)))?;
+
+    to_arrow_schema(&avro_schema)
+}
+
+fn extract_avro_schema_from_commit_metadata(
+    commit_metadata: &Map<String, Value>,
+) -> Option<String> {
+    commit_metadata
+        .get("extraMetadata")
+        .and_then(|v| v.as_object())
+        .and_then(|obj| {
+            obj.get("schema")
+                .and_then(|v| v.as_str())
+                .map(|s| s.to_string())
+        })
+}
diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs
index d123126..8b5bf92 100644
--- a/crates/core/src/table/builder.rs
+++ b/crates/core/src/table/builder.rs
@@ -233,8 +233,9 @@ impl OptionResolver {
         let bytes = storage.get_file_data(".hoodie/hoodie.properties").await?;
         let table_properties = parse_data_for_options(&bytes, "=")?;
 
-        // We currently treat all table properties as the highest precedence, 
which is valid for most cases.
-        // TODO: handle the case where the same key is present in both table 
properties and options
+        // Table properties on storage (hoodie.properties) should have the 
highest precedence,
+        // except for writer-changeable properties like enabling metadata 
table/indexes.
+        // TODO: return err when user-provided options conflict with table 
properties
         for (k, v) in table_properties {
             options.insert(k.to_string(), v.to_string());
         }
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 949ca80..7d281f7 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -100,6 +100,7 @@ use crate::config::HudiConfigs;
 use crate::expr::filter::{from_str_tuples, Filter};
 use crate::file_group::file_slice::FileSlice;
 use crate::file_group::reader::FileGroupReader;
+use crate::schema::resolver::{resolve_avro_schema, resolve_schema};
 use crate::table::builder::TableBuilder;
 use crate::table::fs_view::FileSystemView;
 use crate::table::partition::PartitionPruner;
@@ -218,8 +219,17 @@ impl Table {
     }
 
     /// Get the latest Avro schema string of the table.
+    ///
+    /// The implementation looks for the schema in the following order:
+    /// 1. Timeline commit metadata.
+    /// 2. `hoodie.properties` file's [HudiTableConfig::CreateSchema].
+    ///
+    /// ### Note
+    ///
+    /// The schema returned does not contain Hudi's [MetaField]s,
+    /// which is different from the one returned by [Table::get_schema].
     pub async fn get_avro_schema(&self) -> Result<String> {
-        self.timeline.get_latest_avro_schema().await
+        resolve_avro_schema(self).await
     }
 
     /// Same as [Table::get_avro_schema], but blocking.
@@ -231,8 +241,13 @@ impl Table {
     }
 
     /// Get the latest [arrow_schema::Schema] of the table.
+    ///
+    /// The implementation looks for the schema in the following order:
+    /// 1. Timeline commit metadata.
+    /// 2. Base file schema.
+    /// 3. `hoodie.properties` file's [HudiTableConfig::CreateSchema].
     pub async fn get_schema(&self) -> Result<Schema> {
-        self.timeline.get_latest_schema().await
+        resolve_schema(self).await
     }
 
     /// Same as [Table::get_schema], but blocking.
@@ -706,9 +721,11 @@ mod tests {
     };
     use crate::config::util::{empty_filters, empty_options};
     use crate::config::HUDI_CONF_DIR;
+    use crate::error::CoreError;
+    use crate::metadata::meta_field::MetaField;
     use crate::storage::util::join_url_segments;
     use crate::storage::Storage;
-    use hudi_test::SampleTable;
+    use hudi_test::{assert_arrow_field_names_eq, assert_avro_field_names_eq, 
SampleTable};
     use std::collections::HashSet;
     use std::fs::canonicalize;
     use std::path::PathBuf;
@@ -784,67 +801,88 @@ mod tests {
         }
     }
 
+    #[test]
+    fn hudi_table_get_schema_from_empty_table_without_create_schema() {
+        let table = 
get_test_table_without_validation("table_props_no_create_schema");
+
+        let schema = table.get_schema_blocking();
+        assert!(schema.is_err());
+        assert!(matches!(schema.unwrap_err(), CoreError::SchemaNotFound(_)));
+
+        let schema = table.get_avro_schema_blocking();
+        assert!(schema.is_err());
+        assert!(matches!(schema.unwrap_err(), CoreError::SchemaNotFound(_)));
+    }
+
+    #[test]
+    fn 
hudi_table_get_schema_from_empty_table_resolves_to_table_create_schema() {
+        for base_url in SampleTable::V6Empty.urls() {
+            let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+
+            // Validate the Arrow schema
+            let schema = hudi_table.get_schema_blocking();
+            assert!(schema.is_ok());
+            let schema = schema.unwrap();
+            assert_arrow_field_names_eq!(
+                schema,
+                [MetaField::field_names(), vec!["id", "name", 
"isActive"]].concat()
+            );
+
+            // Validate the Avro schema
+            let avro_schema = hudi_table.get_avro_schema_blocking();
+            assert!(avro_schema.is_ok());
+            let avro_schema = avro_schema.unwrap();
+            assert_avro_field_names_eq!(&avro_schema, ["id", "name", 
"isActive"])
+        }
+    }
+
     #[test]
     fn hudi_table_get_schema() {
         let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
         let hudi_table = Table::new_blocking(base_url.path()).unwrap();
-        let fields: Vec<String> = hudi_table
-            .get_schema_blocking()
-            .unwrap()
-            .flattened_fields()
-            .into_iter()
-            .map(|f| f.name().to_string())
-            .collect();
-        assert_eq!(
-            fields,
-            vec![
-                "_hoodie_commit_time",
-                "_hoodie_commit_seqno",
-                "_hoodie_record_key",
-                "_hoodie_partition_path",
-                "_hoodie_file_name",
-                "id",
-                "name",
-                "isActive",
-                "byteField",
-                "shortField",
-                "intField",
-                "longField",
-                "floatField",
-                "doubleField",
-                "decimalField",
-                "dateField",
-                "timestampField",
-                "binaryField",
-                "arrayField",
-                "element",
-                "arr_struct_f1",
-                "arr_struct_f2",
-                "mapField",
-                "map_field_value_struct_f1",
-                "map_field_value_struct_f2",
-                "structField",
-                "field1",
-                "field2",
-                "child_struct",
-                "child_field1",
-                "child_field2"
-            ]
+        let original_field_names = [
+            "id",
+            "name",
+            "isActive",
+            "byteField",
+            "shortField",
+            "intField",
+            "longField",
+            "floatField",
+            "doubleField",
+            "decimalField",
+            "dateField",
+            "timestampField",
+            "binaryField",
+            "arrayField",
+            "mapField",
+            "structField",
+        ];
+
+        // Check Arrow schema
+        let arrow_schema = hudi_table.get_schema_blocking();
+        assert!(arrow_schema.is_ok());
+        let arrow_schema = arrow_schema.unwrap();
+        assert_arrow_field_names_eq!(
+            arrow_schema,
+            [MetaField::field_names(), original_field_names.to_vec()].concat()
         );
+
+        // Check Avro schema
+        let avro_schema = hudi_table.get_avro_schema_blocking();
+        assert!(avro_schema.is_ok());
+        let avro_schema = avro_schema.unwrap();
+        assert_avro_field_names_eq!(&avro_schema, original_field_names);
     }
 
     #[test]
     fn hudi_table_get_partition_schema() {
         let base_url = SampleTable::V6TimebasedkeygenNonhivestyle.url_to_cow();
         let hudi_table = Table::new_blocking(base_url.path()).unwrap();
-        let fields: Vec<String> = hudi_table
-            .get_partition_schema_blocking()
-            .unwrap()
-            .flattened_fields()
-            .into_iter()
-            .map(|f| f.name().to_string())
-            .collect();
-        assert_eq!(fields, vec!["ts_str"]);
+        let schema = hudi_table.get_partition_schema_blocking();
+        assert!(schema.is_ok());
+        let schema = schema.unwrap();
+        assert_arrow_field_names_eq!(schema, ["ts_str"]);
     }
 
     #[test]
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index b76747b..094c07d 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -20,25 +20,24 @@ pub mod instant;
 pub(crate) mod selector;
 pub(crate) mod util;
 
-use crate::avro_to_arrow::to_arrow_schema;
 use crate::config::HudiConfigs;
 use crate::error::CoreError;
 use crate::file_group::builder::{build_file_groups, 
build_replaced_file_groups, FileGroupMerger};
 use crate::file_group::FileGroup;
 use crate::metadata::HUDI_METADATA_DIR;
-use crate::schema::prepend_meta_fields;
+use crate::schema::resolver::{
+    resolve_avro_schema_from_commit_metadata, 
resolve_schema_from_commit_metadata,
+};
 use crate::storage::Storage;
 use crate::timeline::instant::Action;
 use crate::timeline::selector::TimelineSelector;
 use crate::Result;
-use arrow_schema::{Schema, SchemaRef};
+use arrow_schema::Schema;
 use instant::Instant;
 use log::debug;
 use serde_json::{Map, Value};
 use std::collections::{HashMap, HashSet};
 use std::fmt::Debug;
-use std::path::PathBuf;
-use std::str::FromStr;
 use std::sync::Arc;
 
 /// A [Timeline] contains transaction logs of all actions performed on the 
table at different [Instant]s of time.
@@ -214,10 +213,10 @@ impl Timeline {
             .map_err(|e| CoreError::Timeline(format!("Failed to get commit 
metadata: {}", e)))
     }
 
-    async fn get_latest_commit_metadata(&self) -> Result<Map<String, Value>> {
+    pub(crate) async fn get_latest_commit_metadata(&self) -> 
Result<Map<String, Value>> {
         match self.completed_commits.iter().next_back() {
             Some(instant) => self.get_instant_metadata(instant).await,
-            None => Ok(Map::new()),
+            None => Err(CoreError::TimelineNoCommit),
         }
     }
 
@@ -232,101 +231,28 @@ impl Timeline {
     ///
     /// Only completed commits are considered.
     pub fn get_latest_commit_timestamp(&self) -> Result<String> {
-        self.get_latest_commit_timestamp_as_option().map_or_else(
-            || Err(CoreError::Timeline("No commits found".to_string())),
-            |t| Ok(t.to_string()),
-        )
-    }
-
-    fn extract_avro_schema_from_commit_metadata(
-        commit_metadata: &Map<String, Value>,
-    ) -> Option<String> {
-        commit_metadata
-            .get("extraMetadata")
-            .and_then(|v| v.as_object())
-            .and_then(|obj| {
-                obj.get("schema")
-                    .and_then(|v| v.as_str())
-                    .map(|s| s.to_string())
-            })
+        self.get_latest_commit_timestamp_as_option()
+            .map_or_else(|| Err(CoreError::TimelineNoCommit), |t| 
Ok(t.to_string()))
     }
 
-    /// Get the latest Avro schema string from the [Timeline].
+    /// Get the latest [apache_avro::schema::Schema] as [String] from the 
[Timeline].
+    ///
+    /// ### Note
+    /// This API behaves differently from 
[crate::table::Table::get_avro_schema],
+    /// which additionally looks for [HudiTableConfig::CreateSchema] in the 
table config.
     pub async fn get_latest_avro_schema(&self) -> Result<String> {
         let commit_metadata = self.get_latest_commit_metadata().await?;
-        
Self::extract_avro_schema_from_commit_metadata(&commit_metadata).ok_or_else(|| {
-            CoreError::CommitMetadata(
-                "Failed to resolve the latest schema: no schema 
found".to_string(),
-            )
-        })
+        resolve_avro_schema_from_commit_metadata(&commit_metadata)
     }
 
     /// Get the latest [arrow_schema::Schema] from the [Timeline].
+    ///
+    /// ### Note
+    /// This API behaves differently from [crate::table::Table::get_schema],
+    /// which additionally looks for [HudiTableConfig::CreateSchema] in the 
table config.
     pub async fn get_latest_schema(&self) -> Result<Schema> {
         let commit_metadata = self.get_latest_commit_metadata().await?;
-
-        if let Some(avro_schema) = 
Self::extract_avro_schema_from_commit_metadata(&commit_metadata)
-        {
-            let avro_schema = 
apache_avro::schema::Schema::parse_str(&avro_schema)?;
-            let arrow_schema = to_arrow_schema(&avro_schema).map_err(|e| {
-                CoreError::CommitMetadata(format!(
-                    "Failed to convert the latest Avro schema: {}",
-                    e
-                ))
-            })?;
-            return prepend_meta_fields(SchemaRef::new(arrow_schema));
-        }
-
-        let first_partition = commit_metadata
-            .get("partitionToWriteStats")
-            .and_then(|v| v.as_object());
-
-        let partition_path = first_partition
-            .and_then(|obj| obj.keys().next())
-            .ok_or_else(|| {
-                CoreError::CommitMetadata(
-                    "Failed to resolve the latest schema: no partition path 
found".to_string(),
-                )
-            })?;
-
-        let first_value = first_partition
-            .and_then(|obj| obj.values().next())
-            .and_then(|value| value.as_array())
-            .and_then(|arr| arr.first());
-
-        let parquet_path = first_value.and_then(|v| v["path"].as_str());
-        match parquet_path {
-            Some(path) if path.ends_with(".parquet") => {
-                Ok(self.storage.get_parquet_file_schema(path).await?)
-            }
-            Some(_) => {
-                // TODO: properly handle deltacommit
-                let base_file = first_value
-                    .and_then(|v| v["baseFile"].as_str())
-                    .ok_or_else(|| {
-                        CoreError::CommitMetadata(
-                            "Failed to resolve the latest schema: no file path 
found".to_string(),
-                        )
-                    })?;
-                let parquet_file_path_buf = PathBuf::from_str(partition_path)
-                    .map_err(|e| {
-                        CoreError::CommitMetadata(format!(
-                            "Failed to resolve the latest schema: {}",
-                            e
-                        ))
-                    })?
-                    .join(base_file);
-                let path = parquet_file_path_buf.to_str().ok_or_else(|| {
-                    CoreError::CommitMetadata(
-                        "Failed to resolve the latest schema: invalid file 
path".to_string(),
-                    )
-                })?;
-                Ok(self.storage.get_parquet_file_schema(path).await?)
-            }
-            None => Err(CoreError::CommitMetadata(
-                "Failed to resolve the latest schema: no file path 
found".to_string(),
-            )),
-        }
+        resolve_schema_from_commit_metadata(&commit_metadata, 
self.storage.clone()).await
     }
 
     pub(crate) async fn get_replaced_file_groups_as_of(
@@ -392,7 +318,7 @@ mod tests {
 
     use url::Url;
 
-    use hudi_test::SampleTable;
+    use hudi_test::{assert_arrow_field_names_eq, assert_avro_field_names_eq, 
SampleTable};
 
     use crate::config::table::HudiTableConfig;
     use crate::metadata::meta_field::MetaField;
@@ -420,11 +346,10 @@ mod tests {
         let timeline = create_test_timeline(base_url).await;
         let table_schema = timeline.get_latest_schema().await;
         assert!(table_schema.is_err());
-        assert!(table_schema
-            .err()
-            .unwrap()
-            .to_string()
-            .starts_with("Commit metadata error: Failed to resolve the latest 
schema:"))
+        assert!(matches!(
+            table_schema.unwrap_err(),
+            CoreError::TimelineNoCommit
+        ))
     }
 
     #[tokio::test]
@@ -473,39 +398,104 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn get_avro_schema() {
+    async fn timeline_get_schema_returns_error_for_no_schema_and_write_stats() 
{
         let base_url = Url::from_file_path(
-            
canonicalize(Path::new("tests/data/timeline/commits_with_valid_schema")).unwrap(),
+            canonicalize(Path::new(
+                "tests/data/timeline/commits_with_no_schema_and_write_stats",
+            ))
+            .unwrap(),
         )
         .unwrap();
         let timeline = create_test_timeline(base_url).await;
 
+        // Check Arrow schema
+        let arrow_schema = timeline.get_latest_schema().await;
+        assert!(arrow_schema.is_err());
+        assert!(matches!(arrow_schema.unwrap_err(), 
CoreError::CommitMetadata(_)), "Getting Arrow schema includes base file lookup, 
therefore expect CommitMetadata error when write stats are missing");
+
+        // Check Avro schema
         let avro_schema = timeline.get_latest_avro_schema().await;
-        assert!(avro_schema.is_ok());
-        assert_eq!(
-            avro_schema.unwrap(),
-            
"{\"type\":\"record\",\"name\":\"v6_trips_record\",\"namespace\":\"hoodie.v6_trips\",\"fields\":[{\"name\":\"ts\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"uuid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"rider\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"driver\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"fare\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"stri
 [...]
-        )
+        assert!(avro_schema.is_err());
+        assert!(matches!(avro_schema.unwrap_err(), 
CoreError::SchemaNotFound(_)), "Getting Avro schema does not include base file 
lookup, therefore expect SchemaNotFound error when `extraMetadata.schema` is 
missing");
     }
 
     #[tokio::test]
-    async fn get_arrow_schema() {
+    async fn timeline_get_schema_from_commit_metadata() {
         let base_url = Url::from_file_path(
-            
canonicalize(Path::new("tests/data/timeline/commits_with_valid_schema")).unwrap(),
+            canonicalize(Path::new(
+                
"tests/data/timeline/commits_with_valid_schema_in_commit_metadata",
+            ))
+            .unwrap(),
         )
         .unwrap();
         let timeline = create_test_timeline(base_url).await;
 
+        // Check Arrow schema
         let arrow_schema = timeline.get_latest_schema().await;
         assert!(arrow_schema.is_ok());
         let arrow_schema = arrow_schema.unwrap();
-        let fields = arrow_schema
-            .fields
-            .iter()
-            .map(|f| f.name())
-            .collect::<Vec<_>>();
-        let mut expected_fields = MetaField::field_names();
-        expected_fields.extend_from_slice(&["ts", "uuid", "rider", "driver", 
"fare", "city"]);
-        assert_eq!(fields, expected_fields)
+        assert_arrow_field_names_eq!(
+            arrow_schema,
+            [
+                MetaField::field_names(),
+                vec!["ts", "uuid", "rider", "driver", "fare", "city"]
+            ]
+            .concat()
+        );
+
+        // Check Avro schema
+        let avro_schema = timeline.get_latest_avro_schema().await;
+        assert!(avro_schema.is_ok());
+        let avro_schema = avro_schema.unwrap();
+        assert_avro_field_names_eq!(
+            &avro_schema,
+            ["ts", "uuid", "rider", "driver", "fare", "city"]
+        );
+    }
+
+    #[tokio::test]
+    async fn timeline_get_schema_from_empty_commit_metadata() {
+        let base_url = Url::from_file_path(
+            canonicalize(Path::new(
+                "tests/data/timeline/commits_with_empty_commit_metadata",
+            ))
+            .unwrap(),
+        )
+        .unwrap();
+        let timeline = create_test_timeline(base_url).await;
+
+        // Check Arrow schema
+        let result = timeline.get_latest_schema().await;
+        assert!(result.is_err());
+        assert!(matches!(result.unwrap_err(), CoreError::CommitMetadata(_)));
+
+        // Check Avro schema
+        let result = timeline.get_latest_avro_schema().await;
+        assert!(result.is_err());
+        assert!(matches!(result.unwrap_err(), CoreError::CommitMetadata(_)));
+    }
+
+    #[tokio::test]
+    async fn timeline_get_schema_from_base_file() {
+        let timeline_base_urls = [
+            "tests/data/timeline/commits_load_schema_from_base_file_cow",
+            "tests/data/timeline/commits_load_schema_from_base_file_mor",
+        ];
+        for base_url in timeline_base_urls {
+            let base_url = 
Url::from_file_path(canonicalize(Path::new(base_url)).unwrap()).unwrap();
+            let timeline = create_test_timeline(base_url).await;
+
+            let arrow_schema = timeline.get_latest_schema().await;
+            assert!(arrow_schema.is_ok());
+            let arrow_schema = arrow_schema.unwrap();
+            assert_arrow_field_names_eq!(
+                arrow_schema,
+                [
+                    MetaField::field_names(),
+                    vec!["ts", "uuid", "rider", "driver", "fare", "city"]
+                ]
+                .concat()
+            );
+        }
     }
 }
diff --git 
a/crates/core/tests/data/table_props_no_create_schema/.hoodie/hoodie.properties 
b/crates/core/tests/data/table_props_no_create_schema/.hoodie/hoodie.properties
new file mode 100644
index 0000000..af3c8b1
--- /dev/null
+++ 
b/crates/core/tests/data/table_props_no_create_schema/.hoodie/hoodie.properties
@@ -0,0 +1,14 @@
+#Properties saved on 2024-05-10T16:20:08.106Z
+#Fri May 10 11:20:08 CDT 2024
+hoodie.datasource.write.drop.partition.columns=false
+hoodie.table.type=COPY_ON_WRITE
+hoodie.archivelog.folder=archived
+hoodie.timeline.layout.version=1
+hoodie.table.version=6
+hoodie.table.recordkey.fields=id
+hoodie.database.name=default
+hoodie.datasource.write.partitionpath.urlencode=false
+hoodie.table.name=v6_empty
+hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
+hoodie.datasource.write.hive_style_partitioning=true
+hoodie.table.checksum=1980710965
diff --git 
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/20250628002223107.commit
 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/20250628002223107.commit
new file mode 100644
index 0000000..c0f6ce3
--- /dev/null
+++ 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/20250628002223107.commit
@@ -0,0 +1,94 @@
+{
+  "partitionToWriteStats" : {
+    "city=san_francisco" : [ {
+      "fileId" : "b271b5f8-29df-463d-ba4d-feedbe6e09ed-0",
+      "path" : 
"city=san_francisco/b271b5f8-29df-463d-ba4d-feedbe6e09ed-0_0-13-35_20250628002223107.parquet",
+      "cdcStats" : null,
+      "prevCommit" : "null",
+      "numWrites" : 4,
+      "numDeletes" : 0,
+      "numUpdateWrites" : 0,
+      "numInserts" : 4,
+      "totalWriteBytes" : 436228,
+      "totalWriteErrors" : 0,
+      "tempPath" : null,
+      "partitionPath" : "city=san_francisco",
+      "totalLogRecords" : 0,
+      "totalLogFilesCompacted" : 0,
+      "totalLogSizeCompacted" : 0,
+      "totalUpdatedRecordsCompacted" : 0,
+      "totalLogBlocks" : 0,
+      "totalCorruptLogBlock" : 0,
+      "totalRollbackBlocks" : 0,
+      "fileSizeInBytes" : 436228,
+      "minEventTime" : null,
+      "maxEventTime" : null,
+      "runtimeStats" : {
+        "totalScanTime" : 0,
+        "totalUpsertTime" : 0,
+        "totalCreateTime" : 606
+      }
+    } ],
+    "city=sao_paulo" : [ {
+      "fileId" : "a3c5da68-55a5-4804-ab8b-57e75252c69f-0",
+      "path" : 
"city=sao_paulo/a3c5da68-55a5-4804-ab8b-57e75252c69f-0_1-13-36_20250628002223107.parquet",
+      "cdcStats" : null,
+      "prevCommit" : "null",
+      "numWrites" : 2,
+      "numDeletes" : 0,
+      "numUpdateWrites" : 0,
+      "numInserts" : 2,
+      "totalWriteBytes" : 436064,
+      "totalWriteErrors" : 0,
+      "tempPath" : null,
+      "partitionPath" : "city=sao_paulo",
+      "totalLogRecords" : 0,
+      "totalLogFilesCompacted" : 0,
+      "totalLogSizeCompacted" : 0,
+      "totalUpdatedRecordsCompacted" : 0,
+      "totalLogBlocks" : 0,
+      "totalCorruptLogBlock" : 0,
+      "totalRollbackBlocks" : 0,
+      "fileSizeInBytes" : 436064,
+      "minEventTime" : null,
+      "maxEventTime" : null,
+      "runtimeStats" : {
+        "totalScanTime" : 0,
+        "totalUpsertTime" : 0,
+        "totalCreateTime" : 609
+      }
+    } ],
+    "city=chennai" : [ {
+      "fileId" : "03ffd613-fb74-456e-b6bb-115355d9b0ed-0",
+      "path" : 
"city=chennai/03ffd613-fb74-456e-b6bb-115355d9b0ed-0_2-13-37_20250628002223107.parquet",
+      "cdcStats" : null,
+      "prevCommit" : "null",
+      "numWrites" : 2,
+      "numDeletes" : 0,
+      "numUpdateWrites" : 0,
+      "numInserts" : 2,
+      "totalWriteBytes" : 436045,
+      "totalWriteErrors" : 0,
+      "tempPath" : null,
+      "partitionPath" : "city=chennai",
+      "totalLogRecords" : 0,
+      "totalLogFilesCompacted" : 0,
+      "totalLogSizeCompacted" : 0,
+      "totalUpdatedRecordsCompacted" : 0,
+      "totalLogBlocks" : 0,
+      "totalCorruptLogBlock" : 0,
+      "totalRollbackBlocks" : 0,
+      "fileSizeInBytes" : 436045,
+      "minEventTime" : null,
+      "maxEventTime" : null,
+      "runtimeStats" : {
+        "totalScanTime" : 0,
+        "totalUpsertTime" : 0,
+        "totalCreateTime" : 606
+      }
+    } ]
+  },
+  "compacted" : false,
+  "extraMetadata" : { },
+  "operationType" : "INSERT"
+}
diff --git 
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/20250628002223107.commit.requested
similarity index 100%
copy from 
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
copy to 
crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/20250628002223107.commit.requested
diff --git 
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/20250628002223107.inflight
 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/20250628002223107.inflight
new file mode 100644
index 0000000..1b807d0
--- /dev/null
+++ 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/20250628002223107.inflight
@@ -0,0 +1,82 @@
+{
+  "partitionToWriteStats" : {
+    "city=san_francisco" : [ {
+      "fileId" : "",
+      "path" : null,
+      "cdcStats" : null,
+      "prevCommit" : "null",
+      "numWrites" : 0,
+      "numDeletes" : 0,
+      "numUpdateWrites" : 0,
+      "numInserts" : 4,
+      "totalWriteBytes" : 0,
+      "totalWriteErrors" : 0,
+      "tempPath" : null,
+      "partitionPath" : null,
+      "totalLogRecords" : 0,
+      "totalLogFilesCompacted" : 0,
+      "totalLogSizeCompacted" : 0,
+      "totalUpdatedRecordsCompacted" : 0,
+      "totalLogBlocks" : 0,
+      "totalCorruptLogBlock" : 0,
+      "totalRollbackBlocks" : 0,
+      "fileSizeInBytes" : 0,
+      "minEventTime" : null,
+      "maxEventTime" : null,
+      "runtimeStats" : null
+    } ],
+    "city=sao_paulo" : [ {
+      "fileId" : "",
+      "path" : null,
+      "cdcStats" : null,
+      "prevCommit" : "null",
+      "numWrites" : 0,
+      "numDeletes" : 0,
+      "numUpdateWrites" : 0,
+      "numInserts" : 2,
+      "totalWriteBytes" : 0,
+      "totalWriteErrors" : 0,
+      "tempPath" : null,
+      "partitionPath" : null,
+      "totalLogRecords" : 0,
+      "totalLogFilesCompacted" : 0,
+      "totalLogSizeCompacted" : 0,
+      "totalUpdatedRecordsCompacted" : 0,
+      "totalLogBlocks" : 0,
+      "totalCorruptLogBlock" : 0,
+      "totalRollbackBlocks" : 0,
+      "fileSizeInBytes" : 0,
+      "minEventTime" : null,
+      "maxEventTime" : null,
+      "runtimeStats" : null
+    } ],
+    "city=chennai" : [ {
+      "fileId" : "",
+      "path" : null,
+      "cdcStats" : null,
+      "prevCommit" : "null",
+      "numWrites" : 0,
+      "numDeletes" : 0,
+      "numUpdateWrites" : 0,
+      "numInserts" : 2,
+      "totalWriteBytes" : 0,
+      "totalWriteErrors" : 0,
+      "tempPath" : null,
+      "partitionPath" : null,
+      "totalLogRecords" : 0,
+      "totalLogFilesCompacted" : 0,
+      "totalLogSizeCompacted" : 0,
+      "totalUpdatedRecordsCompacted" : 0,
+      "totalLogBlocks" : 0,
+      "totalCorruptLogBlock" : 0,
+      "totalRollbackBlocks" : 0,
+      "fileSizeInBytes" : 0,
+      "minEventTime" : null,
+      "maxEventTime" : null,
+      "runtimeStats" : null
+    } ]
+  },
+  "compacted" : false,
+  "extraMetadata" : { },
+  "operationType" : "INSERT"
+}
diff --git 
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/city=chennai/03ffd613-fb74-456e-b6bb-115355d9b0ed-0_2-13-37_20250628002223107.parquet
 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/city=chennai/03ffd613-fb74-456e-b6bb-115355d9b0ed-0_2-13-37_20250628002223107.parquet
new file mode 100644
index 0000000..33f7b25
Binary files /dev/null and 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/city=chennai/03ffd613-fb74-456e-b6bb-115355d9b0ed-0_2-13-37_20250628002223107.parquet
 differ
diff --git 
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/city=san_francisco/b271b5f8-29df-463d-ba4d-feedbe6e09ed-0_0-13-35_20250628002223107.parquet
 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/city=san_francisco/b271b5f8-29df-463d-ba4d-feedbe6e09ed-0_0-13-35_20250628002223107.parquet
new file mode 100644
index 0000000..7394806
Binary files /dev/null and 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/city=san_francisco/b271b5f8-29df-463d-ba4d-feedbe6e09ed-0_0-13-35_20250628002223107.parquet
 differ
diff --git 
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/city=sao_paulo/a3c5da68-55a5-4804-ab8b-57e75252c69f-0_1-13-36_20250628002223107.parquet
 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/city=sao_paulo/a3c5da68-55a5-4804-ab8b-57e75252c69f-0_1-13-36_20250628002223107.parquet
new file mode 100644
index 0000000..4071d25
Binary files /dev/null and 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/city=sao_paulo/a3c5da68-55a5-4804-ab8b-57e75252c69f-0_1-13-36_20250628002223107.parquet
 differ
diff --git 
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030642808.deltacommit
 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030642808.deltacommit
new file mode 100644
index 0000000..85e86f2
--- /dev/null
+++ 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030642808.deltacommit
@@ -0,0 +1,94 @@
+{
+  "partitionToWriteStats" : {
+    "city=san_francisco" : [ {
+      "fileId" : "d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0",
+      "path" : 
"city=san_francisco/d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_0-13-60_20250331030642808.parquet",
+      "cdcStats" : null,
+      "prevCommit" : "null",
+      "numWrites" : 4,
+      "numDeletes" : 0,
+      "numUpdateWrites" : 0,
+      "numInserts" : 4,
+      "totalWriteBytes" : 436406,
+      "totalWriteErrors" : 0,
+      "tempPath" : null,
+      "partitionPath" : "city=san_francisco",
+      "totalLogRecords" : 0,
+      "totalLogFilesCompacted" : 0,
+      "totalLogSizeCompacted" : 0,
+      "totalUpdatedRecordsCompacted" : 0,
+      "totalLogBlocks" : 0,
+      "totalCorruptLogBlock" : 0,
+      "totalRollbackBlocks" : 0,
+      "fileSizeInBytes" : 436406,
+      "minEventTime" : null,
+      "maxEventTime" : null,
+      "runtimeStats" : {
+        "totalScanTime" : 0,
+        "totalUpsertTime" : 0,
+        "totalCreateTime" : 491
+      }
+    } ],
+    "city=sao_paulo" : [ {
+      "fileId" : "061498b3-e8ef-42f9-9d17-a509b2779501-0",
+      "path" : 
"city=sao_paulo/061498b3-e8ef-42f9-9d17-a509b2779501-0_1-13-61_20250331030642808.parquet",
+      "cdcStats" : null,
+      "prevCommit" : "null",
+      "numWrites" : 2,
+      "numDeletes" : 0,
+      "numUpdateWrites" : 0,
+      "numInserts" : 2,
+      "totalWriteBytes" : 436205,
+      "totalWriteErrors" : 0,
+      "tempPath" : null,
+      "partitionPath" : "city=sao_paulo",
+      "totalLogRecords" : 0,
+      "totalLogFilesCompacted" : 0,
+      "totalLogSizeCompacted" : 0,
+      "totalUpdatedRecordsCompacted" : 0,
+      "totalLogBlocks" : 0,
+      "totalCorruptLogBlock" : 0,
+      "totalRollbackBlocks" : 0,
+      "fileSizeInBytes" : 436205,
+      "minEventTime" : null,
+      "maxEventTime" : null,
+      "runtimeStats" : {
+        "totalScanTime" : 0,
+        "totalUpsertTime" : 0,
+        "totalCreateTime" : 491
+      }
+    } ],
+    "city=chennai" : [ {
+      "fileId" : "84e82649-b1ee-4a25-a316-17cc6872616b-0",
+      "path" : 
"city=chennai/84e82649-b1ee-4a25-a316-17cc6872616b-0_2-13-62_20250331030642808.parquet",
+      "cdcStats" : null,
+      "prevCommit" : "null",
+      "numWrites" : 2,
+      "numDeletes" : 0,
+      "numUpdateWrites" : 0,
+      "numInserts" : 2,
+      "totalWriteBytes" : 436184,
+      "totalWriteErrors" : 0,
+      "tempPath" : null,
+      "partitionPath" : "city=chennai",
+      "totalLogRecords" : 0,
+      "totalLogFilesCompacted" : 0,
+      "totalLogSizeCompacted" : 0,
+      "totalUpdatedRecordsCompacted" : 0,
+      "totalLogBlocks" : 0,
+      "totalCorruptLogBlock" : 0,
+      "totalRollbackBlocks" : 0,
+      "fileSizeInBytes" : 436184,
+      "minEventTime" : null,
+      "maxEventTime" : null,
+      "runtimeStats" : {
+        "totalScanTime" : 0,
+        "totalUpsertTime" : 0,
+        "totalCreateTime" : 491
+      }
+    } ]
+  },
+  "compacted" : false,
+  "extraMetadata" : { },
+  "operationType" : "UPSERT"
+}
diff --git 
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030642808.deltacommit.inflight
 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030642808.deltacommit.inflight
new file mode 100644
index 0000000..976e451
--- /dev/null
+++ 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030642808.deltacommit.inflight
@@ -0,0 +1,82 @@
+{
+  "partitionToWriteStats" : {
+    "city=san_francisco" : [ {
+      "fileId" : "",
+      "path" : null,
+      "cdcStats" : null,
+      "prevCommit" : "null",
+      "numWrites" : 0,
+      "numDeletes" : 0,
+      "numUpdateWrites" : 0,
+      "numInserts" : 4,
+      "totalWriteBytes" : 0,
+      "totalWriteErrors" : 0,
+      "tempPath" : null,
+      "partitionPath" : null,
+      "totalLogRecords" : 0,
+      "totalLogFilesCompacted" : 0,
+      "totalLogSizeCompacted" : 0,
+      "totalUpdatedRecordsCompacted" : 0,
+      "totalLogBlocks" : 0,
+      "totalCorruptLogBlock" : 0,
+      "totalRollbackBlocks" : 0,
+      "fileSizeInBytes" : 0,
+      "minEventTime" : null,
+      "maxEventTime" : null,
+      "runtimeStats" : null
+    } ],
+    "city=sao_paulo" : [ {
+      "fileId" : "",
+      "path" : null,
+      "cdcStats" : null,
+      "prevCommit" : "null",
+      "numWrites" : 0,
+      "numDeletes" : 0,
+      "numUpdateWrites" : 0,
+      "numInserts" : 2,
+      "totalWriteBytes" : 0,
+      "totalWriteErrors" : 0,
+      "tempPath" : null,
+      "partitionPath" : null,
+      "totalLogRecords" : 0,
+      "totalLogFilesCompacted" : 0,
+      "totalLogSizeCompacted" : 0,
+      "totalUpdatedRecordsCompacted" : 0,
+      "totalLogBlocks" : 0,
+      "totalCorruptLogBlock" : 0,
+      "totalRollbackBlocks" : 0,
+      "fileSizeInBytes" : 0,
+      "minEventTime" : null,
+      "maxEventTime" : null,
+      "runtimeStats" : null
+    } ],
+    "city=chennai" : [ {
+      "fileId" : "",
+      "path" : null,
+      "cdcStats" : null,
+      "prevCommit" : "null",
+      "numWrites" : 0,
+      "numDeletes" : 0,
+      "numUpdateWrites" : 0,
+      "numInserts" : 2,
+      "totalWriteBytes" : 0,
+      "totalWriteErrors" : 0,
+      "tempPath" : null,
+      "partitionPath" : null,
+      "totalLogRecords" : 0,
+      "totalLogFilesCompacted" : 0,
+      "totalLogSizeCompacted" : 0,
+      "totalUpdatedRecordsCompacted" : 0,
+      "totalLogBlocks" : 0,
+      "totalCorruptLogBlock" : 0,
+      "totalRollbackBlocks" : 0,
+      "fileSizeInBytes" : 0,
+      "minEventTime" : null,
+      "maxEventTime" : null,
+      "runtimeStats" : null
+    } ]
+  },
+  "compacted" : false,
+  "extraMetadata" : { },
+  "operationType" : "UPSERT"
+}
\ No newline at end of file
diff --git 
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030642808.deltacommit.requested
similarity index 100%
copy from 
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
copy to 
crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030642808.deltacommit.requested
diff --git 
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030645735.deltacommit
 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030645735.deltacommit
new file mode 100644
index 0000000..f0b09a1
--- /dev/null
+++ 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030645735.deltacommit
@@ -0,0 +1,43 @@
+{
+  "partitionToWriteStats" : {
+    "city=san_francisco" : [ {
+      "fileId" : "d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0",
+      "path" : 
"city=san_francisco/.d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_20250331030642808.log.1_0-26-85",
+      "cdcStats" : null,
+      "prevCommit" : "20250331030642808",
+      "numWrites" : 1,
+      "numDeletes" : 0,
+      "numUpdateWrites" : 1,
+      "numInserts" : 0,
+      "totalWriteBytes" : 1148,
+      "totalWriteErrors" : 0,
+      "tempPath" : null,
+      "partitionPath" : "city=san_francisco",
+      "totalLogRecords" : 0,
+      "totalLogFilesCompacted" : 0,
+      "totalLogSizeCompacted" : 0,
+      "totalUpdatedRecordsCompacted" : 0,
+      "totalLogBlocks" : 0,
+      "totalCorruptLogBlock" : 0,
+      "totalRollbackBlocks" : 0,
+      "fileSizeInBytes" : 1148,
+      "minEventTime" : null,
+      "maxEventTime" : null,
+      "runtimeStats" : {
+        "totalScanTime" : 0,
+        "totalUpsertTime" : 296,
+        "totalCreateTime" : 0
+      },
+      "logVersion" : 1,
+      "logOffset" : 0,
+      "baseFile" : 
"d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_0-13-60_20250331030642808.parquet",
+      "logFiles" : [ 
".d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_20250331030642808.log.1_0-26-85" ],
+      "recordsStats" : {
+        "val" : null
+      }
+    } ]
+  },
+  "compacted" : false,
+  "extraMetadata" : { },
+  "operationType" : "UPSERT_PREPPED"
+}
diff --git 
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030645735.deltacommit.inflight
 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030645735.deltacommit.inflight
new file mode 100644
index 0000000..6ce54c9
--- /dev/null
+++ 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030645735.deltacommit.inflight
@@ -0,0 +1,56 @@
+{
+  "partitionToWriteStats" : {
+    "city=san_francisco" : [ {
+      "fileId" : "",
+      "path" : null,
+      "cdcStats" : null,
+      "prevCommit" : "null",
+      "numWrites" : 0,
+      "numDeletes" : 0,
+      "numUpdateWrites" : 0,
+      "numInserts" : 0,
+      "totalWriteBytes" : 0,
+      "totalWriteErrors" : 0,
+      "tempPath" : null,
+      "partitionPath" : null,
+      "totalLogRecords" : 0,
+      "totalLogFilesCompacted" : 0,
+      "totalLogSizeCompacted" : 0,
+      "totalUpdatedRecordsCompacted" : 0,
+      "totalLogBlocks" : 0,
+      "totalCorruptLogBlock" : 0,
+      "totalRollbackBlocks" : 0,
+      "fileSizeInBytes" : 0,
+      "minEventTime" : null,
+      "maxEventTime" : null,
+      "runtimeStats" : null
+    }, {
+      "fileId" : "d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0",
+      "path" : null,
+      "cdcStats" : null,
+      "prevCommit" : "20250331030642808",
+      "numWrites" : 0,
+      "numDeletes" : 0,
+      "numUpdateWrites" : 1,
+      "numInserts" : 0,
+      "totalWriteBytes" : 0,
+      "totalWriteErrors" : 0,
+      "tempPath" : null,
+      "partitionPath" : null,
+      "totalLogRecords" : 0,
+      "totalLogFilesCompacted" : 0,
+      "totalLogSizeCompacted" : 0,
+      "totalUpdatedRecordsCompacted" : 0,
+      "totalLogBlocks" : 0,
+      "totalCorruptLogBlock" : 0,
+      "totalRollbackBlocks" : 0,
+      "fileSizeInBytes" : 0,
+      "minEventTime" : null,
+      "maxEventTime" : null,
+      "runtimeStats" : null
+    } ]
+  },
+  "compacted" : false,
+  "extraMetadata" : { },
+  "operationType" : "UPSERT_PREPPED"
+}
\ No newline at end of file
diff --git 
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030645735.deltacommit.requested
similarity index 100%
copy from 
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
copy to 
crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/20250331030645735.deltacommit.requested
diff --git 
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/city=san_francisco/.d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_20250331030642808.log.1_0-26-85
 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/city=san_francisco/.d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_20250331030642808.log.1_0-26-85
new file mode 100644
index 0000000..95a99ea
Binary files /dev/null and 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/city=san_francisco/.d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_20250331030642808.log.1_0-26-85
 differ
diff --git 
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/city=san_francisco/d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_0-13-60_20250331030642808.parquet
 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/city=san_francisco/d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_0-13-60_20250331030642808.parquet
new file mode 100644
index 0000000..4262432
Binary files /dev/null and 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/city=san_francisco/d0304c53-6fd2-4b7a-a9d6-5ff632f79224-0_0-13-60_20250331030642808.parquet
 differ
diff --git 
a/crates/core/tests/data/timeline/commits_with_empty_commit_metadata/.hoodie/20240402144910683.commit
 
b/crates/core/tests/data/timeline/commits_with_empty_commit_metadata/.hoodie/20240402144910683.commit
new file mode 100644
index 0000000..0967ef4
--- /dev/null
+++ 
b/crates/core/tests/data/timeline/commits_with_empty_commit_metadata/.hoodie/20240402144910683.commit
@@ -0,0 +1 @@
+{}
diff --git 
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
 
b/crates/core/tests/data/timeline/commits_with_empty_commit_metadata/.hoodie/20240402144910683.commit.requested
similarity index 100%
copy from 
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
copy to 
crates/core/tests/data/timeline/commits_with_empty_commit_metadata/.hoodie/20240402144910683.commit.requested
diff --git 
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
 
b/crates/core/tests/data/timeline/commits_with_empty_commit_metadata/.hoodie/20240402144910683.inflight
similarity index 100%
copy from 
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
copy to 
crates/core/tests/data/timeline/commits_with_empty_commit_metadata/.hoodie/20240402144910683.inflight
diff --git 
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
 
b/crates/core/tests/data/timeline/commits_with_empty_commit_metadata/.hoodie/hoodie.properties
similarity index 100%
copy from 
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
copy to 
crates/core/tests/data/timeline/commits_with_empty_commit_metadata/.hoodie/hoodie.properties
diff --git 
a/crates/core/tests/data/timeline/commits_with_no_schema_and_write_stats/.hoodie/20240402144910683.commit
 
b/crates/core/tests/data/timeline/commits_with_no_schema_and_write_stats/.hoodie/20240402144910683.commit
new file mode 100644
index 0000000..4d59d6a
--- /dev/null
+++ 
b/crates/core/tests/data/timeline/commits_with_no_schema_and_write_stats/.hoodie/20240402144910683.commit
@@ -0,0 +1,6 @@
+{
+  "partitionToWriteStats" : { },
+  "compacted" : false,
+  "extraMetadata" : { },
+  "operationType" : "BULK_INSERT"
+}
diff --git 
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
 
b/crates/core/tests/data/timeline/commits_with_no_schema_and_write_stats/.hoodie/20240402144910683.commit.requested
similarity index 100%
copy from 
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
copy to 
crates/core/tests/data/timeline/commits_with_no_schema_and_write_stats/.hoodie/20240402144910683.commit.requested
diff --git 
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
 
b/crates/core/tests/data/timeline/commits_with_no_schema_and_write_stats/.hoodie/20240402144910683.inflight
similarity index 100%
copy from 
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
copy to 
crates/core/tests/data/timeline/commits_with_no_schema_and_write_stats/.hoodie/20240402144910683.inflight
diff --git 
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
 
b/crates/core/tests/data/timeline/commits_with_no_schema_and_write_stats/.hoodie/hoodie.properties
similarity index 100%
copy from 
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
copy to 
crates/core/tests/data/timeline/commits_with_no_schema_and_write_stats/.hoodie/hoodie.properties
diff --git 
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit
 
b/crates/core/tests/data/timeline/commits_with_valid_schema_in_commit_metadata/.hoodie/20240402144910683.commit
similarity index 100%
rename from 
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit
rename to 
crates/core/tests/data/timeline/commits_with_valid_schema_in_commit_metadata/.hoodie/20240402144910683.commit
diff --git 
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
 
b/crates/core/tests/data/timeline/commits_with_valid_schema_in_commit_metadata/.hoodie/20240402144910683.commit.requested
similarity index 100%
rename from 
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.commit.requested
rename to 
crates/core/tests/data/timeline/commits_with_valid_schema_in_commit_metadata/.hoodie/20240402144910683.commit.requested
diff --git 
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
 
b/crates/core/tests/data/timeline/commits_with_valid_schema_in_commit_metadata/.hoodie/20240402144910683.inflight
similarity index 100%
rename from 
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/20240402144910683.inflight
rename to 
crates/core/tests/data/timeline/commits_with_valid_schema_in_commit_metadata/.hoodie/20240402144910683.inflight
diff --git 
a/crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
 
b/crates/core/tests/data/timeline/commits_with_valid_schema_in_commit_metadata/.hoodie/hoodie.properties
similarity index 100%
rename from 
crates/core/tests/data/timeline/commits_with_valid_schema/.hoodie/hoodie.properties
rename to 
crates/core/tests/data/timeline/commits_with_valid_schema_in_commit_metadata/.hoodie/hoodie.properties
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index f3a0601..b22a30d 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -337,6 +337,8 @@ mod tests {
 
     use datafusion::logical_expr::BinaryExpr;
     use hudi_core::config::read::HudiReadConfig::InputPartitions;
+    use hudi_core::metadata::meta_field::MetaField;
+    use hudi_test::assert_arrow_field_names_eq;
     use hudi_test::SampleTable::{
         V6ComplexkeygenHivestyle, V6Empty, V6Nonpartitioned, 
V6SimplekeygenHivestyleNoMetafields,
         V6SimplekeygenNonhivestyle, V6SimplekeygenNonhivestyleOverwritetable,
@@ -358,13 +360,16 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn test_get_empty_schema_from_empty_table() {
+    async fn test_get_create_schema_from_empty_table() {
         let table_provider =
             HudiDataSource::new_with_options(V6Empty.path_to_cow().as_str(), 
empty_options())
                 .await
                 .unwrap();
         let schema = table_provider.schema();
-        assert!(schema.fields().is_empty());
+        assert_arrow_field_names_eq!(
+            schema,
+            [MetaField::field_names(), vec!["id", "name", "isActive"]].concat()
+        );
     }
 
     async fn register_test_table_with_session<I, K, V>(
diff --git a/crates/test/src/util.rs b/crates/test/src/util.rs
index 87c4889..98a84cd 100644
--- a/crates/test/src/util.rs
+++ b/crates/test/src/util.rs
@@ -65,3 +65,35 @@ pub fn set_fixed_timezone(tz: &str) {
 pub fn reset_timezone() {
     env::remove_var("TZ");
 }
+
+#[macro_export]
+macro_rules! assert_arrow_field_names_eq {
+    ($schema:expr, $expected:expr) => {{
+        let actual: Vec<_> = $schema.fields().iter().map(|f| 
f.name()).collect();
+        assert_eq!(
+            actual, $expected,
+            "Schema field names do not match expected fields.\nActual: 
{:?}\nExpected: {:?}",
+            actual, $expected
+        );
+    }};
+}
+
+#[macro_export]
+macro_rules! assert_avro_field_names_eq {
+    ($schema:expr, $expected:expr) => {{
+        let schema_json_value = 
serde_json::from_str::<serde_json::Value>($schema).unwrap();
+        let actual = schema_json_value
+            .get("fields")
+            .unwrap()
+            .as_array()
+            .unwrap()
+            .iter()
+            .map(|f| f.get("name").unwrap().as_str().unwrap())
+            .collect::<Vec<_>>();
+        assert_eq!(
+            actual, $expected,
+            "Schema field names do not match expected fields.\nActual: 
{:?}\nExpected: {:?}",
+            actual, $expected
+        );
+    }};
+}

Reply via email to